Introduction
This book collects notes and examples of scheduling models built with OR-Tools CP-SAT. The focus is on job-shop style problems: sequencing tasks on machines, handling changeovers, respecting breaks and shifts, modeling resources, and grouping tasks into campaigns.
The book is split into two parts.
- Concepts walks through the core modeling ideas once. Read these first if CP-SAT or constraint programming is new to you.
- Examples indexes the Python files in the
scheduling/folder and links each one back to the concepts it demonstrates. Examples are numbered and build on each other, so reading them in order is usually easiest.
All code lives in the scheduling/ directory of the repo. Open a file in your
editor while reading the corresponding chapter to see the full model.
A minimal CP-SAT template
Almost every example follows the same five-step shape.
from ortools.sat.python import cp_model
# 1. Data
# ... sets, durations, changeover table, etc.
# 2. Decision variables
model = cp_model.CpModel()
# ... start/end/interval/bool vars
# 3. Objective
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [ends[t] for t in tasks])
model.minimize(make_span)
# 4. Constraints
# ... precedence, resources, circuits, ...
# 5. Solve and post-process
solver = cp_model.CpSolver()
status = solver.solve(model)
The rest of the book explains what goes into step 4.
CP-SAT basics
CP-SAT is a constraint-programming solver with integer and boolean variables. Before looking at scheduling, it helps to be comfortable with a few primitives.
Variables
x = model.new_int_var(0, 100, 'x') # integer in [0, 100]
b = model.new_bool_var('b') # boolean (integer in {0, 1})
Linear constraints
model.add(x + y == 10)
model.add(x <= y)
model.add(sum(bs) == 1) # exactly-one on a list of bools
model.add_exactly_one(bs) # same, more idiomatic
Boolean combinations
model.add_bool_or([a, b]) # a or b
model.add_bool_and([a, b]) # a and b
model.add_bool_xor([a, b]) # exactly one of a, b
Reification with only_enforce_if
A constraint can be conditioned on a boolean literal. The constraint is only active when the literal is true.
model.add(x >= 5).only_enforce_if(b)
model.add(x < 5).only_enforce_if(~b)
Chaining two only_enforce_if calls gives an "and" of conditions:
model.add(y == 1).only_enforce_if(b1).only_enforce_if(b2) # y == 1 iff b1 and b2
For "or", you normally introduce intermediate booleans or use
add_bool_or.
add_min_equality, add_max_equality, add_multiplication_equality
These express z = min(xs), z = max(xs), z = x * y (or the product of a
list). add_max_equality is how makespan is usually encoded:
model.add_max_equality(make_span, [ends[t] for t in tasks])
Domains
For "x belongs to a non-contiguous set of values" use
add_linear_expression_in_domain:
domain = cp_model.Domain.from_intervals([[0, 4], [11, 100]])
model.add_linear_expression_in_domain(x, domain)
See example_00_unit_tests.py for a collection of small snippets exercising
each of these.
Solve and read back
solver = cp_model.CpSolver()
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(solver.value(x))
Interval variables
An interval variable bundles three integer variables - start, size, end -
with the implicit constraint start + size == end. CP-SAT uses intervals to
reason efficiently about scheduling: add_no_overlap and add_cumulative both
expect intervals.
Three flavors
Regular interval
iv = model.new_interval_var(start, size, end, name="t1")
Replaces a manual model.add(end - start == size).
Optional interval
An interval that is only scheduled when a presence boolean is true. Essential when a task may or may not be assigned to a given machine.
iv = model.new_optional_interval_var(start, size, end, is_present, name="t1_on_m1")
If is_present is false, the interval disappears from add_no_overlap /
add_cumulative reasoning.
Fixed-size interval
Convenient for breaks, shift boundaries, and anything with a known position.
br = model.new_fixed_size_interval_var(start=2, size=1, name="break")
Typical use
intervals = {
(m, t): model.new_optional_interval_var(
starts[m, t],
processing_time[product_of(t)],
ends[m, t],
presence[m, t],
f"t{t}_on_m{m}",
)
for t in tasks for m in machines
}
for m in machines:
model.add_no_overlap([intervals[m, t] for t in tasks])
Examples that introduce intervals: example_05_seq_with_intervals.py (first
use), example_03_seq_scale_Mathieu.py (dramatic speed-up vs. manual duration
constraints).
Circuit and sequencing
To sequence tasks on a machine, you need both the order and the time
constraints that follow from it. CP-SAT's add_circuit is the standard tool.
What add_circuit does
add_circuit(arcs) takes a list of triples [i, j, literal]. It asserts that
the selected arcs (where literal == 1) form a single Hamiltonian circuit over
the referenced nodes. Self-arcs [i, i, literal] mean node i is skipped
when literal is true.
arcs = []
for t1 in tasks:
arcs.append([0, t1, start_literal(t1)]) # dummy -> t1 (first)
arcs.append([t1, 0, end_literal(t1)]) # t1 -> dummy (last)
arcs.append([t1, t1, ~presence[t1]]) # skip t1 if absent
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, seq[t1, t2]])
model.add_circuit(arcs)
Node 0 (or -1) is typically a dummy "first/last" node.
Linking the circuit to time
add_circuit only picks the order. You also need: if t1 -> t2 is chosen,
then end[t1] + gap <= start[t2]. This is a reified constraint:
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])
gap is usually changeover_time (see Changeover) or
0.
Multi-machine
With multiple machines, build one circuit per machine and gate absent tasks with self-loops:
for m in machines:
arcs = []
for t in tasks:
arcs.append([t, t, ~presence[m, t]])
for t2 in tasks:
if t != t2:
arcs.append([t, t2, seq[m, t, t2]])
model.add_circuit(arcs)
add_exactly_one(presence[m, t] for m in machines) ensures each task ends up on
exactly one machine.
Examples: example_01_simple_sequence.py (single machine),
example_03_seq_multi_stations.py (multi-machine).
Changeover
A changeover is the time needed to switch a machine from producing one product to another. There are three common ways to model it.
1. In the objective
Charge a cost for each seq[t1, t2] whose products differ. The cost does not
appear in the schedule itself, only the cost minimised.
total_co = sum(seq[t1, t2] * changeover_cost[t1, t2] for ...)
model.minimize(make_span + total_co)
Simple, but time and cost are decoupled: the schedule may not leave physical room for the changeover.
Example: example_01_simple_sequence.py.
2. In the precedence constraint
Include the changeover in the gap between tasks:
gap = changeover_time if products_differ(t1, t2) else 0
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])
Now time and cost agree: a changeover actually pushes the next task later.
Example: example_04_seq_with_changeover_in_constraint.py.
3. As a first-class event
Create an optional interval for every (t1, t2) that represents the
changeover itself. When t1 -> t2 is chosen, the interval is present, sits
between the two tasks, and has the right duration.
co_iv = model.new_optional_interval_var(co_start, co_duration, co_end, co_present, ...)
model.add(end[t1] <= co_start).only_enforce_if(seq[t1, t2])
model.add(co_end <= start[t2]).only_enforce_if(seq[t1, t2])
model.add(co_present == 1).only_enforce_if(seq[t1, t2])
This lets you add the changeover interval to add_cumulative (it consumes
operator time) or apply cleaning-resource constraints to it.
Example: example_08_changeover_as_event.py.
Starting product
A machine usually begins with some product already loaded. Model it with a dummy task 0 whose "product" is the starting product; the cost from dummy to the first real task is zero if they match, else the usual changeover.
Example: example_02_seq_lock_starting_product.py.
Breaks
A break is a time window during which a machine or operator is unavailable. Three techniques cover most cases.
1. Break as a fixed interval in add_cumulative
For each break, build a new_fixed_size_interval_var and add it alongside task
intervals with the full demand. Tasks are pushed around the break.
break_intervals = [
model.new_fixed_size_interval_var(start=s, size=e - s, name="break")
for (s, e) in breaks
]
all_intervals = task_intervals + break_intervals
demands = [1] * len(task_intervals) + [1] * len(break_intervals)
model.add_cumulative(all_intervals, demands, capacity=1)
Example: example_07_break_without_changeover.py.
2. Task duration stretched by overlapping breaks
When a task may run through a break and the break simply extends its total
time on the machine, use per-time-slot booleans that indicate whether the
task uses slot i, then add is_break[i] for each covered slot.
uses[t, i] = starts_before_i AND ends_after_i
duration[t] = base + sum(is_break[i] * uses[t, i] for i)
interval[t] = new_interval_var(start, duration, end, ...)
Example: example_14_task_delaying_break.py.
3. Break-aware start domains
If the break pattern is periodic, restrict task starts to the valid slots
with add_linear_expression_in_domain. Much faster than per-slot booleans.
domain_no_break = cp_model.Domain.from_values([...])
model.add_linear_expression_in_domain(start[t], domain_no_break)
Example: example_29_linear_domain_for_breaks.py,
example_33_conditional_duration_linear_domain.py.
Automatic jobs
Some "automatic" tasks don't consume the operator while running (think: a machine runs itself after a short manual setup). Model only the setup portion inside the cumulative, using a 1-unit interval at the task's start.
Example: example_12_an_automatic_job.py, example_13_automatic_jobs.py.
Shifts
A shift is a working window. Tasks must fit inside one shift (or, depending on policy, be split / disallowed from crossing shifts).
Synthetic shift breaks
Insert a tiny "fake break" interval at each shift boundary and forbid any
task from overlapping it with add_no_overlap. This prevents shift-crossing
without enumerating shift assignments.
for (s, e) in synthetic_shift_breaks:
br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
for t in tasks:
model.add_no_overlap([task_interval[t], br])
Example: example_16_shift_crossing_fake_time_unit.py.
Explicit shift assignment
Alternatively, give every task a one-hot presence[shift, task] and enforce
the shift window when present:
for t in tasks:
model.add_exactly_one(presence[s, t] for s in shifts)
for s in shifts:
model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
model.add(end[t] <= shift_end[s] ).only_enforce_if(presence[s, t])
More variables, but the assignment is explicit and easy to extend (e.g. to per-shift capacity).
Example: example_17_shift_crossing_mathieu.py.
Resources and cumulative
add_no_overlap says "at most one interval at a time". add_cumulative is the
generalisation: each interval consumes some amount of a shared resource, and
the total consumption must not exceed a capacity.
No-overlap
model.add_no_overlap(intervals)
Used per machine (one task at a time) and per stage (one job at a time in a flow-shop style).
Cumulative
model.add_cumulative(intervals, demands, capacity)
demands[i] is the amount of resource taken by intervals[i] while it runs.
Typical uses:
- Shared operator across machines. If two machines need the same operator,
cumulative over all their task intervals with demand
1and capacity1forbids parallel runs. Example:example_06_seq_with_intervals_resource.py. - Breaks. Treat a break as an interval that fully occupies the resource.
Example:
example_07_break_without_changeover.py. - Automatic jobs. Only the setup portion consumes the operator, modeled as a size-1 interval at each task's start.
Resource modes
Some tasks can run in different modes with different durations and headcounts. Encode the choice with a one-hot bool per mode and derive the actual processing time from it:
for t in tasks:
model.add_exactly_one([mode[t, k] for k in modes])
model.add(
proc_time[t] == sum(processing_time[product[t], k] * mode[t, k] for k in modes)
)
Example: example_10_people_mode.py.
Headcount tracking
If the per-task resource depends on whether the task overlaps a break (or
some other condition), plain add_cumulative may be insufficient. Build an
explicit per-timestep resource variable and link it to task-start presence
booleans. Three methods are compared in
example_34_headcount_tracking.py.
Multi-stage jobs
A job can consist of several stages that must run in order. Each stage is a task with its own start/end; the job start is the earliest task start and the job end is the latest task end.
Job - stage - task structure
tasks = {(job, stage) for job in jobs for stage in stages}
for job in jobs:
model.add_min_equality(job_start[job], [start[job, s] for s in stages])
model.add_max_equality(job_end[job], [end [job, s] for s in stages])
# stage precedence
for s in sorted(stages)[:-1]:
model.add(end[job, s] <= start[job, s + 1])
Example: example_21_stages_one_job.py.
Stage-level no-overlap
If each stage has a single shared machine, forbid two jobs from sitting on the same stage simultaneously:
for s in stages:
model.add_no_overlap([intervals[job, s] for job in jobs])
Examples: example_22_stages_two_jobs.py,
example_23_multistage_two_jobs_co.py.
Campaigning
A campaign is a run of same-product tasks on a machine between two changeovers. Typical rules:
- tasks within a campaign are the same product and pay no changeover cost,
- a campaign has a maximum size (e.g. at most
Ntasks), - switching products or hitting the cap triggers a changeover.
Approach 1: campaigns as entities
Create a set of potential campaigns, each with start/end/duration/presence,
and variables linking tasks to campaigns. Sequence campaigns (not tasks)
using add_circuit. The campaign-level changeover cost sits in the gap
between campaigns.
Pros: close to the business view. Cons: more variables, scales worse.
Example: example_09_max_number_of_continuous_tasks.py.
Approach 2: cumulative rank per task
Keep tasks as the atomic unit and attach a rank variable
cumul[t] in [0, campaign_size - 1]. On each t1 -> t2 arc:
- if the campaign continues,
cumul[t2] = cumul[t1] + 1, - if a changeover happens,
cumul[t2] = 0andend[t1] + changeover <= start[t2].
A reach_max[t] boolean fires when cumul[t] == campaign_size - 1, forcing a
reset and changeover. add_max_equality(max_value, [0, cumul[t1] + 1 - reach_end[t1] * campaign_size]) is a useful trick to compute the next rank
under an only_enforce_if.
Pros: fewer variables, scales better. Cons: trickier to explain.
Examples: example_24_campaigning_with_cumul.py (base),
example_27_campaigning_products.py (multi-product),
example_28_campaigning_products_machines.py (multi-machine).
Locking the task order
When tasks have deadlines that align with their index, locking
start[t-1] <= start[t] (or the stricter end[t-1] <= start[t]) is a cheap
heuristic that often gives a 10x+ solve-time improvement. See
example_25_campaigning_with_locked_seq.py and the two
example_26_campaigning_locked_seq_improved*.py variants.
Flexible campaign ends
If the model should be free to end a campaign early (not just at the cap),
drop the "force reach_end when cumul hits max" implication and let the
solver choose. This usually gives better objective values at a small solve-
time cost. See the two example_26_*_improved*.py files for the comparison.
Solver techniques
Beyond modeling, CP-SAT exposes a few knobs that help on hard instances.
Decision strategies
Tell the solver which variables to branch on first, and which value to try first. Often needed when a symmetric model returns a "correct but ugly" schedule.
model.add_decision_strategy(
starts.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE,
)
Example: example_07_break_without_changeover.py applies two strategies
(one on starts, one on sequence literals) to get a canonical output.
Parallel workers
solver.parameters.num_search_workers = 8
Uses N worker threads. Examples example_03_seq_scale*.py benchmark the
resulting speedup.
Warm-starting with hints
You can seed the search with values for any subset of variables:
model.proto().solution_hint.vars.append(var_index)
model.proto().solution_hint.values.append(value)
Or clear and re-set them between solves:
model.clear_hints()
add_hints(model, previous_solution)
This is the basis of phase solving: build the full model once, then run it
repeatedly with an increasing max_time, feeding each phase's solution in
as hints to the next. Example: example_32_solving_by_phases.py.
Reading back values
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(solver.value(var))
print(solver.objective_value())
MODEL_INVALID almost always means a constraint references a variable that
was never bound to the right model instance, or an only_enforce_if was
attached to something that is not a literal.
Examples overview
Each example in this section corresponds to one Python file under
scheduling/. Chapters are kept short: a brief description, the concepts it
demonstrates (linked back to the Concepts
section), and the source file inlined at the bottom.
Examples are grouped by topic in the sidebar:
- Basics - CP-SAT primitives and small modeling tricks.
- Sequencing - ordering tasks on one or more machines.
- Changeover and intervals - different ways to model switches between products and the move from manual durations to interval variables.
- Breaks - unavailable time windows, including breaks that extend a task's duration and automatic jobs that only need an operator for setup.
- Shifts - preventing tasks from crossing shift boundaries.
- Multi-stage jobs - jobs with ordered stages and per-stage capacity.
- Resources - flexible resource/headcount modes and time-varying demand tracking.
- Campaigning - grouping same-product tasks between changeovers, with multiple modelling approaches.
- Solver techniques - warm-starting CP-SAT across phases with hints.
A few files (example_11, example_18, example_19, example_30) are
empty placeholders kept for numbering; their chapters only note what they
would have covered.
Unit tests
Source: scheduling/example_00_unit_tests.py
Before modeling a scheduling problem you have to be fluent in the constraint
primitives CP-SAT actually speaks. This file is a bench of tiny self-contained
models, each exercising one feature: boolean combinators, reified equalities
with only_enforce_if, combining conditions via add_multiplication_equality,
and domain constraints for non-contiguous value sets.
It is the only chapter with no scheduling content. Everything later assumes you are comfortable with what lives here. Useful as a cheat sheet when you want to remember how to express, for example, "b = (5 <= x <= 10)". Several snippets are commented-out alternatives kept for comparison.
Concepts
Source
from ortools.sat.python import cp_model
model = cp_model.CpModel()
def get(x):
return solver.value(x)
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_or(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_and(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_xor(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 2)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 1)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 0)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
model = cp_model.CpModel()
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x==1).only_enforce_if(~x)
#model.add(x==0).only_enforce_if(~x)
# model.add(x==1).only_enforce_if(x)
model.add(x==0).only_enforce_if(x)
#model.add(y==1).only_enforce_if(x)
model.minimize(x)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x))
#
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x_is_no_less_than_5 = model.new_bool_var('x_is_no_less_than_5')
x_is_no_more_than_10 = model.new_bool_var('x_is_no_more_than_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_no_less_than_5 == x >= 5)
# model.add(x_is_no_less_than_5 == 1).only_enforce_if(x>=5)
# model.add(x_is_no_more_than_10 == 1).only_enforce_if(x <= 10)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))
##########################################
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_greater_than_5 = model.new_bool_var('x_is_greater_than_5')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x >= 5).only_enforce_if(x_is_greater_than_5)
model.add(x < 5).only_enforce_if(~x_is_greater_than_5)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_greater_than_5))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
# This gives invalid
model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
#model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 5).only_enforce_if(~x_is_between_5_and_10)
model.add(x >10).only_enforce_if(~x_is_between_5_and_10)
model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')
model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)
model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)
model.add(x_is_between_5_and_10==x_greater_than_5*x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, )
model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x = model.new_int_var(0, 100, 'x')
model.add_linear_constraint(x, 5, 10).only_enforce_if(x_is_between_5_and_10)
model.add_linear_expression_in_domain(
x,
cp_model.Domain.from_intervals([[0, 4], [11, 100]])
).only_enforce_if(~x_is_between_5_and_10)
model.add(x == 3)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')
model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)
model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, [x_greater_than_5, x_less_than_10])
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
Events overlapping
Source: scheduling/example_15_events_overlapping.py
Utility chapter. Given two intervals with fixed times, how do you express "do they overlap, and by how much"?
Two reified booleans - t1 starts before t2 AND t1 ends after t2 starts -
combined with add_multiplication_equality give the overlap indicator.
The overlap duration is then a conditional end[t1] - start[t2]:
model.add(duration[t1, t2] == end[t1] - start[t2]).only_enforce_if(overlap[t1, t2])
model.add(duration[t1, t2] == 0).only_enforce_if(~overlap[t1, t2])
The pattern recurs whenever you need to measure overlap rather than forbid it. The duration-stretching break model already used a close relative; more will show up in the headcount-tracking chapter.
Concepts
- CP-SAT basics (reification,
add_multiplication_equality)
Source
from ortools.sat.python import cp_model
model = cp_model.CpModel()
max_time = 10
tasks = {1, 2}
# 1. Data
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
# overlap
# model.add(var_task_starts[1] == 1)
# model.add(var_task_ends[1] == 5)
# model.add(var_task_starts[2] == 3)
# model.add(var_task_ends[2] == 7)
# No overlap
model.add(var_task_starts[1] == 1)
model.add(var_task_ends[1] == 3)
model.add(var_task_starts[2] == 5)
model.add(var_task_ends[2] == 7)
var_overlap = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_overlap_duration = {
(t1, t2): model.new_int_var(0, max_time, '')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_start_earlier_than_start = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_end_later_than_start = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
for t1 in tasks:
for t2 in tasks:
if t1 == t2:
continue
model.add(var_task_starts[t1] <= var_task_starts[t2]).only_enforce_if(var_start_earlier_than_start[t1, t2])
model.add(var_task_starts[t1] > var_task_starts[t2]).only_enforce_if(~var_start_earlier_than_start[t1, t2])
model.add(var_task_ends[t1] > var_task_starts[t2]).only_enforce_if(var_end_later_than_start[t1, t2])
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(~var_end_later_than_start[t1, t2])
model.add_multiplication_equality(
var_overlap[t1, t2],
[var_start_earlier_than_start[t1, t2], var_end_later_than_start[t1, t2]]
)
model.add(var_overlap_duration[t1, t2] == var_task_ends[t1] - var_task_starts[t2]).only_enforce_if(var_overlap[t1, t2])
model.add(var_overlap_duration[t1, t2] == 0).only_enforce_if(~var_overlap[t1, t2])
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
count = 0
for t1 in tasks:
for t2 in tasks:
if t1 == t2:
continue
if solver.value(var_overlap[t1,t2]):
count = count + 1
print(f'Task{t1} starts earlier and overlap Task{t2} for a duration of '
f'{solver.value(var_overlap_duration[t1, t2])} units')
if count == 0:
print("No overlapped tasks at all")
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Simple sequence
Source: scheduling/example_01_simple_sequence.py
Three tasks, two products, one machine. Switching between products costs time. What is the cheapest order?
The smallest useful scheduling model. A dummy task 0 represents "machine
idle" so the circuit has a start and an end; sequencing booleans
seq[t1, t2] are stitched together with add_circuit; when an arc is
chosen, end[t1] <= start[t2] is enforced. Changeover lives in the
objective as a cost-weighted sum over the selected arcs, and task duration
is the manual end - start == duration.
Everything after this chapter is a variation: more machines, different changeover semantics, extra constraints for breaks or shifts. Understanding where the circuit, the presence booleans, and the order-to-time link go is the hard part.
Concepts
- Circuit and sequencing
- Changeover (approach 1: cost in objective)
Source
# circuit arcs
# tuples
# add arc : arcs.append([job1_index, job2_index, binary]) |
# add link: Use only_enforce_if for binary <---> job1.start_time <= job2.end_time | -> three things connected
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
M = 99999
# 1. Data
'''
task product
1 A
2 B
3 A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
m = {
(t1, t2)
for t1 in tasks
for t2 in tasks
if t1 != t2
}
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or task_to_product[t1] == 'dummy' or task_to_product[t2] == 'dummy'
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
# 2. Decision variables
'''
(1, 2): 0/1
(1, 3)
(2, 1)
(2, 3)
(3, 1)
(3, 2)
'''
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_sequence = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for (t1, t2) in m
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)
model.minimize(total_changeover_time)
#model.maximize(total_changeover_time)
# 4. Constraints
# Add duration
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
arcs = list()
'''
arcs.append([0, 1, model.new_bool_var("dummy0" + "_to_1")])
arcs.append([0, 2, model.new_bool_var("dummy0" + "_to_2")])
arcs.append([0, 3, model.new_bool_var("dummy0" + "_to_3")])
arcs.append([1, 0, model.new_bool_var("1_to_" + "dummy0")])
arcs.append([2, 0, model.new_bool_var("2_to_" + "dummy0")])
arcs.append([3, 0, model.new_bool_var("3_to_" + "dummy0")])
'''
for (from_task, to_task) in m:
arcs.append(
[
from_task,
to_task,
variables_sequence[(from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_sequence[(from_task, to_task)])
model.add_circuit(arcs)
#model.add(variables_task_starts[0] == 8)
# Solve
# https://github.com/d-krupke/cpsat-primer
#model.add_decision_strategy(variables_task_starts, cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
for (t1, t2) in m:
print(f'{t1} --> {t2}: {solver.value(variables_sequence[(t1, t2)])}')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Sequence with locked starting product
Source: scheduling/example_02_seq_lock_starting_product.py
A machine rarely starts empty. Usually some product is already loaded, and if the first task happens to be that same product no changeover is needed.
The model is identical to the previous chapter with one change: the changeover table depends on the machine's starting product. From the dummy task, going to a task of the starting product is free; going to any other product costs the regular changeover.
m_cost = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2]
or (task_to_product[t1] == 'dummy'
and task_to_product[t2] == starting_product)
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
This small tweak is the template for every "initial state" extension later in the book.
Concepts
- Changeover (starting product)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
M = 99999
# 1. Data
'''
task product
1 A
2 B
3 A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
starting_product = 'A'
m = {
(t1, t2)
for t1 in tasks
for t2 in tasks
if t1 != t2
}
# A -> A, B --> B: 0
# dummy A -> A: 0
# dummy A -> B: 1
# A -> B, B -> A: 1
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == starting_product
)
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_sequence = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for (t1, t2) in m
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)
model.minimize(total_changeover_time)
# 4. Constraints
# Add duration
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
arcs = list()
for (from_task, to_task) in m:
arcs.append(
[
from_task,
to_task,
variables_sequence[(from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_sequence[(from_task, to_task)])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
for (t1, t2) in m:
print(f'{t1} --> {t2}: {solver.value(variables_sequence[(t1, t2)])}')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Multi-station sequence
Source: scheduling/example_03_seq_multi_stations.py
Scale up to two machines in parallel. The solver now decides both which machine runs each task and in what order.
Variables split in two levels. presence[m, t] is the machine assignment,
one-hot per task via add_exactly_one. Task-level start[t], end[t]
mirror the chosen machine via only_enforce_if(presence[m, t]). Each
machine gets its own add_circuit, with self-loop arcs
[t, t, ~presence[m, t]] that let a task skip any machine it is not on.
Objective remains makespan plus changeover, with add_max_equality
computing the former.
The two-level structure (task-level + machine-task-level) recurs almost unchanged in every subsequent multi-machine chapter.
Concepts
- Circuit and sequencing (multi-machine)
- Changeover (cost in objective)
Source
"""
About changeover:
- changeover cost is independently in objective function
- The time index of tasks are not reflecting changeover events
function s is to see a sorted dict indexed by tuples
task 0 is only used in arcs
duration constraint is by end - start = duration
In Matthieu approach, no such constraint but there is new_optional_interval_var constraints that did this ?
We need task level start and end for convenience of working with more complicated constraints
"""
from ortools.sat.python import cp_model
# Initiate
M = 99999
model = cp_model.CpModel()
def s(x):
sorted_keys = sorted(x)
for key in sorted_keys:
print(f"{key}, {x[key]}")
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)
# 4. Constraints
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# The changeover consideration is done here by Mattheiu's approach
# This can be replaced by inverval variable ?
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
for machine in machines:
arcs = list()
tmp = [x for x in X if x[0] == machine]
for (m, from_task, to_task) in tmp:
arcs.append(
[
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(machine, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print(solver.value(make_span))
for (m, t1, t2) in sorted(X):
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1:
print(f'Machine {m}: {t1} --> {t2} ')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Multi-station scale benchmark
Source: scheduling/example_03_seq_scale.py
How does the previous chapter's model scale? Wrap it in a function over
num_tasks, crank the size up to 12, measure wall time with eight search
workers, plot with matplotlib.
Spoiler: with a manual end - start == duration constraint it gets slow
fast. This chapter sets the baseline; the next one shows the fix. The
objective is makespan only - the changeover term is commented out to keep
the experiment clean.
Concepts
- Solver techniques (parallel workers)
- Pairs with the interval-based rewrite which is orders of magnitude faster.
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
def generate_data(num_tasks):
tasks = {i+1 for i in range(num_tasks)}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy'}
task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
return tasks, tasks_0, task_to_product
def model(num_tasks):
model = cp_model.CpModel()
# 1. Data
tasks, tasks_0, task_to_product = generate_data(num_tasks)
max_time = num_tasks
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)# + total_changeover_time)
# 4. Constraints
for task in tasks:
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
for machine in machines:
arcs = list()
tmp = [x for x in X if x[0] == machine]
for (m, from_task, to_task) in tmp:
arcs.append(
[
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(machine, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
solver.parameters.num_search_workers = 8
start = time()
status = solver.solve(model=model)
total_time = time() - start
return total_time
if __name__ == '__main__':
num_tasks = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
seconds = []
for i in num_tasks:
print(i)
processing_time = model(i)
seconds.append(processing_time)
plt.plot(num_tasks, seconds)
plt.show()
Multi-station scale (intervals)
Source: scheduling/example_03_seq_scale_Mathieu.py
Same benchmark as the previous chapter, but the duration constraint is
replaced with new_optional_interval_var plus per-machine
add_no_overlap. CP-SAT's scheduling engine propagates on intervals far
more effectively than on generic integer constraints, and the speed-up is
dramatic: the same hardware that topped out around 12 tasks now handles
~80.
The per-machine circuit is factored into add_circuit_constraints(...)
so the main loop stays readable. That helper is the template for every
later multi-machine model.
Concepts
- Interval variables (optional intervals)
- Circuit and sequencing (factored helper)
- Solver techniques (parallel workers)
Source
# USE INTERVAL !!!
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
def generate_data(num_tasks):
tasks = {i+1 for i in range(num_tasks)}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy'}
task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
return tasks, tasks_0, task_to_product
def model(num_tasks):
model = cp_model.CpModel()
# 1. Data
tasks, tasks_0, task_to_product = generate_data(num_tasks)
max_time = num_tasks
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_intervals = {
(m, t): model.new_optional_interval_var(
start=variables_machine_task_starts[m, t],
size=processing_time[task_to_product[t]],
end=variables_machine_task_ends[m, t],
is_present=variables_machine_task_presences[m, t],
name=f"interval_{t}_{m}",
)
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)
# 4. Constraints
for task in tasks:
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
for machine in machines:
tmp = {(m, t) for (m, t) in variables_intervals if m == machine}
intervals = [variables_intervals[x] for x in tmp]
model.add_no_overlap(intervals)
# Create circuit constraints
add_circuit_constraints(
model,
machines,
tasks,
variables_task_starts,
variables_task_ends,
variables_machine_task_presences,
)
# Solve
solver = cp_model.CpSolver()
solver.parameters.num_search_workers = 8
start = time()
status = solver.solve(model=model)
total_time = time() - start
return total_time
def add_circuit_constraints(
model,
machines,
tasks,
variables_task_starts,
variables_task_ends,
variables_machine_task_presences,
):
for machine in machines:
arcs = (
[]
) # List of all feasible arcs within a machine. Arcs are boolean to specify circuit from node to node
machine_tasks = tasks
for node_1, task_1 in enumerate(machine_tasks):
mt_1 = str(task_1) + "_" + str(machine)
# Initial arc from the dummy node (0) to a task.
arcs.append(
[0, node_1 + 1, model.new_bool_var("first" + "_" + mt_1)]
) # if mt_1 follows dummy node 0
# Final arc from an arc to the dummy node (0).
arcs.append(
[node_1 + 1, 0, model.new_bool_var("last" + "_" + mt_1)]
) # if dummy node 0 follows mt_1
# For optional task on machine (i.e other machine choice)
# Self-looping arc on the node that corresponds to this arc.
arcs.append(
[
node_1 + 1,
node_1 + 1,
~variables_machine_task_presences[(machine, task_1)],
]
)
for node_2, task_2 in enumerate(machine_tasks):
if node_1 == node_2:
continue
mt_2 = str(task_2) + "_" + str(machine)
# Add sequential boolean constraint: mt_2 follows mt_1
mt2_after_mt1 = model.new_bool_var(f"{mt_2} follows {mt_1}")
arcs.append([node_1 + 1, node_2 + 1, mt2_after_mt1])
# We add the reified precedence to link the literal with the
# times of the two tasks.
min_distance = 0
(
model.add(
variables_task_starts[task_2] >= variables_task_ends[task_1] + min_distance
).only_enforce_if(mt2_after_mt1)
)
model.add_circuit(arcs)
if __name__ == '__main__':
num_tasks = [x+2 for x in range(80)]
seconds = []
for i in num_tasks:
print(i)
processing_time = model(i)
seconds.append(processing_time)
plt.plot(num_tasks, seconds)
plt.show()
Changeover in constraint
Source: scheduling/example_04_seq_with_changeover_in_constraint.py
There is a subtle bug in the earlier models. We charged changeover cost in the objective, but the schedule itself was free to place the next task right after the previous one - with no physical gap for the changeover to happen. Cost and schedule disagreed.
The fix is to put the changeover into the precedence constraint:
model.add(end[t1] + distance <= start[t2]).only_enforce_if(seq[m, t1, t2])
Now a changeover actually pushes the next task later. The objective can go back to being just the makespan, and the resulting schedule is physically executable.
Concepts
- Changeover (approach 2: in constraint)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
'''
M1 A -> A -> A 1+1
M2 A -> B -> B 1+1+1 --> 3
'''
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
for task in tasks_0:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Sequence with intervals
Source: scheduling/example_05_seq_with_intervals.py
Cleanup chapter. Replace the hand-written model.add(end - start == duration) with a proper new_optional_interval_var. Behavior is
identical; the model speaks CP-SAT's native scheduling vocabulary instead.
intervals[m, t] = model.new_optional_interval_var(
start[m, t], processing_time[product_of(t)], end[m, t],
presence[m, t], name=f"interval_{m}_{t}",
)
A small refactor with big downstream payoff: intervals plug directly into
add_no_overlap, add_cumulative, and the break machinery you will see
next.
Concepts
- Interval variables (first introduction)
- Changeover
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Sequence with shared resource
Source: scheduling/example_06_seq_with_intervals_resource.py
Two machines, one operator. Even though the machines themselves could run in parallel, the human can only be in one place at a time.
Now that tasks are intervals, one line expresses the constraint:
intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1] * len(intervals), 1)
With capacity 1 across all machine intervals, CP-SAT stops issuing
parallel schedules. This is the first time we see add_cumulative doing
real work, and it is the pattern that all later resource and break
constraints build on.
Concepts
- Resources and cumulative (shared operator)
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'B'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Add resource constraint that there is only one people so no parallel task expected
intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Changeover as event
Source: scheduling/example_08_changeover_as_event.py
The two earlier changeover approaches - cost in objective, gap in
precedence - both treat the changeover as a number. This chapter promotes
it to a first-class scheduled event: every ordered pair (t1, t2) gets
its own optional interval with presence, start, end, and duration.
co_iv[m, t1, t2] = model.new_optional_interval_var(
co_start[m, t1, t2],
changeover_time[product_of(t2)],
co_end[m, t1, t2],
co_present[m, t1, t2],
...
)
When seq[m, t1, t2] is chosen, the model forces the interval to be
present, sit between the two tasks, and have the right size:
model.add(end[t1] <= co_start[t1, t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end[t1, t2] <= start[t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end - co_start == distance).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 1).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 0).only_enforce_if(~seq[m, t1, t2])
Why bother with all this? Because once the changeover is an interval, it
can live inside add_cumulative just like a task. Need a cleaner that
can only do one changeover at a time across multiple machines? Put the
co-intervals under a shared resource. Not possible with the simpler
formulations.
Concepts
- Changeover (approach 3: as event)
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
'''
# 1. Data
tasks = {1, 2}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 2, 'B': 2}
machines = {0}
machines_starting_products = {0: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# This is not yet used
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
# ! This can replace the end - start = duration constrain
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"t_interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# Add change over-related variables !!!
variables_co_starts = {
(t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_co_ends = {
(t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_machine_co_starts = {
(m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_start")
for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_ends = {
(m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_end")
for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_presences = {
(m, t1, t2): model.new_bool_var(f"co_presence_m{m}_t{t1}_t{t2}")
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
variables_machine_co_intervals = {
(m, t1, t2): model.new_optional_interval_var(
variables_machine_co_starts[m, t1, t2],
changeover_time[task_to_product[t2]],
variables_machine_co_ends[m, t1, t2],
variables_machine_co_presences[m, t1, t2],
name=f"co_interval_m{m}_t{t1}_t{t2}"
)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# One task to one machine.
for task in tasks:
task_candidate_machines = machines
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# co level link to machine-co level
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
for m in machines:
model.add(
variables_co_starts[t1, t2] == variables_machine_co_starts[m, t1, t2]
).only_enforce_if(variables_machine_co_presences[m, t1, t2])
model.add(
variables_co_ends[t1, t2] == variables_machine_co_ends[m, t1, t2]
).only_enforce_if(variables_machine_co_presences[m, t1, t2])
# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for t1 in tasks_0:
for t2 in tasks_0:
# arcs
if t1 != t2:
arcs.append([
t1,
t2,
variables_machine_task_sequence[(m, t1, t2)]
])
distance = m_cost[m, t1, t2]
# cannot require the time index of task 0 to represent the first and the last position
if t2 != 0:
# to schedule tasks and c/o
model.add(
variables_task_ends[t1] <= variables_co_starts[t1, t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_co_ends[t1, t2] <= variables_task_starts[t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
# ensure intervals are consistent so we can apply resource constraints later
model.add(
variables_machine_co_presences[m, t1, t2] == 1
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_machine_co_presences[m, t1, t2] == 0
).only_enforce_if(~variables_machine_task_sequence[(m, t1, t2)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
print('variables_machine_task_sequence[t1, t2]', solver.value(variables_machine_task_sequence[m, t1, t2]))
print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
print('variables_machine_co_presences[m, t1, t2]', solver.value(variables_machine_co_presences[m, t1, t2]))
print('variables_machine_co_starts[m, t1, t2]', solver.value(variables_machine_co_starts[m, t1, t2]))
print('variables_machine_co_ends[m, t1, t2]', solver.value(variables_machine_co_ends[m, t1, t2]))
#print('variables_machine_co_intervals[m, t1, t2]', variables_machine_co_intervals[m, t1, t2])
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Break without changeover
Source: scheduling/example_07_break_without_changeover.py
Time to introduce breaks. Single machine, four identical tasks, and a
fixed break window at (2, 3) during which the machine is unavailable.
The trick is to treat the break as just another interval. A
new_fixed_size_interval_var is placed at the break window and dropped
into an add_cumulative(intervals, capacity=1) alongside the task
intervals. Tasks have nowhere to go during the break and spread around it.
Because all tasks share one product, no changeover logic is needed - a
deliberate simplification to isolate the break mechanic. Two
add_decision_strategy calls are thrown in to force the solver to emit
the canonical order 0 1 2 3 4 0; without them it returns a correct but
oddly permuted sequence.
Concepts
- Breaks (approach 1: fixed interval in cumulative)
- Solver techniques (decision strategies)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
3 A
4 A
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'A', 3: 'A', 4: 'A'}
processing_time = {'dummy': 0, 'A': 1}
changeover_time = {'dummy': 0, 'A': 1}
machines = {0}
machines_starting_products = {0: 'A'}
breaks = {(2, 3)}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# One task to one machine.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=1, name='a_break') for (start, end) in breaks
}
# Add resource control with break
intervals = list(variables_machine_task_intervals.values()) + list(variables_breaks.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)
# Solve
# https://github.com/d-krupke/cpsat-primer
# default
# 0 1 4 2 3 0
#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
# 0 4 3 2 1 0
#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0
# strange
#model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0
# Need the following both to have the right sequence
model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 2 3 4 0
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
One automatic job
Source: scheduling/example_12_an_automatic_job.py
Machines differ in how they consume operator time. Some need the operator for the whole run; others only for setup, after which the machine runs itself and the operator can go handle breaks or other jobs.
The model uses two intervals per task. The full interval is what shows up on the Gantt chart. The setup interval - a size-1 stub at the task's start - is what goes into the cumulative alongside breaks:
model.add_cumulative(
intervals=setup_intervals + break_intervals,
demands=[1] * (len(tasks) + len(breaks)),
capacity=1,
)
Now breaks can push task starts, but once an automatic task has begun it runs through the break uninterrupted.
Concepts
- Breaks (automatic jobs)
- Resources and cumulative
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_3
'''
# 1. Data
tasks = {1}
products = {'A', 'B'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_3'}
processing_time = {'A': 3, 'B': 1}
max_time = 10
breaks = {(0, 1), (2, 10)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_autojobs = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_3'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_autojobs.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1] + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Automatic jobs
Source: scheduling/example_13_automatic_jobs.py
Two automatic jobs need an order. The two-interval trick from the previous chapter carries over: each task has a full interval and a size-1 setup interval.
A small add_circuit with seq[t1, t2] booleans picks the order; each
selected arc enforces end[t1] <= start[t2]. The cumulative still uses
only the setup intervals and the breaks, so once a task is running no
break can interrupt it.
Concepts
- Breaks (automatic jobs)
- Circuit and sequencing
- Resources and cumulative
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_3
2 B TYPE_3
'''
# 1. Data
tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'B'}
task_to_type = {1: 'TYPE_3', 2: 'TYPE_3'}
processing_time = {'A': 4, 'B': 3}
max_time = 10
breaks = {(0, 1), (2, 3), (4, 6), (7, 10)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_auto = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_auto_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_3'
}
var_task_seq = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for t1 in tasks
for t2 in tasks
if t1 != t2
}
arcs = []
for t1 in tasks:
tmp_1 = model.new_bool_var(f'first_to_{t1}')
arcs.append([0, t1, tmp_1])
tmp_2 = model.new_bool_var(f'{t1}_to_last')
arcs.append([t1, 0, tmp_2])
for t2 in tasks:
if t1 == t2:
continue
tmp_3 = model.new_bool_var(f'{t1}_to_{t2}')
arcs.append([t1, t2, var_task_seq[t1, t2]])
model.add(
var_task_ends[t1] <= var_task_starts[t2]
).only_enforce_if(var_task_seq[t1, t2])
model.add_circuit(arcs)
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
print('======================= ALLOCATION & SEQUENCE =======================')
if True:
for t1 in tasks:
for t2 in tasks:
if t1 != t2:
value = solver.value(var_task_seq[(t1, t2)])
print(f'{t1} --> {t2} {value}')
# if value == 1 and t2 != 0:
# print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
# if value == 1 and t2 == 0:
# print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Task delaying a break
Source: scheduling/example_14_task_delaying_break.py
A different way of mixing tasks and breaks. Here the task is not automatic; if it runs through a break, its effective machine time grows by the length of the break.
The encoding is per time slot. For every slot i, a boolean
uses[t, i] fires iff task t covers slot i. Built as the AND of
"started by i" and "still running after i", it becomes one via
add_multiplication_equality. Then
duration[t] = base + sum(is_break[i] * uses[t, i] for i)
adds back the length of every break the task straddles. The task interval uses this stretched duration directly.
Heavy on per-slot booleans, but it is the canonical pattern whenever a duration depends on a time-dependent condition.
Concepts
- Breaks (approach 2: duration stretched)
- CP-SAT basics (reification,
add_multiplication_equality)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_4
2 B TYPE_4
'''
# 1. Data
tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_new_duration = {
task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}
var_task_duration_timeslots = {
(task, i): model.new_bool_var(f'task {task} uses interval {i}')
for task in tasks
for i in range(max_time)
}
## four
# AND pair
var_task_start_earlier_than_start = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
var_task_end_later_than_end = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
for task in tasks:
for i in range(max_time):
model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])
model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])
# model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
# model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
#
# model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
# model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])
# And
model.add_multiplication_equality(
var_task_duration_timeslots[task, i],
[var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
)
# model.add_min_equality(
# var_task_duration_timeslots[task, i],
# [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
# )
# for task in tasks:
# for i in range(max_time):
# start_index = i
# end_index = i + 1
#
# # TRUE
# model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
# model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
# # FALSE
# if i == 0:
# # first time slot
# model.add_linear_expression_in_domain(
# var_task_starts[task],
# cp_model.Domain.from_intervals([[1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# elif i == max_time - 1:
# # last time slot
# model.add_linear_expression_in_domain(
# var_task_ends[task],
# cp_model.Domain.from_intervals([[0, max_time-1]])
# ).only_enforce_if(~var_task_timeslots[task, i])
# else:
# model.add_linear_expression_in_domain(
# x,
# cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# task_1_start < task_2_start < task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:
for task in tasks:
model.add(
var_task_new_duration[task] == processing_time[task_to_product[task]] +
sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
)
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_new_duration[task],
#processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_auto = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_auto_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_4'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'task {task}')
for i in range(max_time):
print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print(solver.value(var_task_new_duration[task]))
print('Make-span:', solver.value(make_span))
# print('======================= ALLOCATION & SEQUENCE =======================')
# if True:
# for t1 in tasks:
# for t2 in tasks:
# if t1 != t2:
# value = solver.value(var_task_seq[(t1, t2)])
# print(f'{t1} --> {t2} {value}')
# # if value == 1 and t2 != 0:
# # print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
# # if value == 1 and t2 == 0:
# # print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Linear domain for breaks
Source: scheduling/example_29_linear_domain_for_breaks.py
When breaks are periodic - a fixed pattern every shift, every day -
modeling each break as an interval inflates the model. An alternative
is to precompute the set of valid task start times (every position that
doesn't overlap a break) and restrict starts to that set with
add_linear_expression_in_domain.
The file defines two model variants and benchmarks them:
- Method 1: breaks as fixed intervals in
add_cumulativeoradd_no_overlap. - Method 2: breaks disappear; start domains are constrained.
For highly periodic schedules the domain method wins: CP-SAT propagates on a small set of start values directly and avoids all the break intervals. The trade-off is loss of generality - arbitrary break patterns don't translate to clean domains.
Concepts
- Breaks (approach 3: start-domain restriction)
- Solver techniques
Source
from ortools.sat.python import cp_model
from time import time
from tqdm import tqdm
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
# method 1: No overlapping
def run_model_1(number_of_tasks):
#number_of_tasks = 200
tasks = {x for x in range(number_of_tasks)}
# 2 tasks --> 8 time units
processing_time = 2
max_time = number_of_tasks * 4 + 100
breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}
valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_start_times = [item for sublist in valid_start_times for item in sublist]
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time,
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] < var_task_starts[task])
var_break_intervals = {
break_id: model.new_fixed_size_interval_var(
break_start, break_end-break_start, f"break_{break_id}"
) for break_id, (break_start, break_end) in enumerate(breaks)
}
all_intervals = [x for x in var_task_intervals.values()] + [x for x in var_break_intervals.values()]
model.add_no_overlap(all_intervals)
# obj
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# for task in tasks:
# print(f'Task {task} ',
# solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
# )
#
# print('Make-span:', solver.value(make_span))
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
# method 2: Use linear domain
def run_model_2(number_of_tasks):
#number_of_tasks = 4
tasks = {x for x in range(number_of_tasks)}
# 2 tasks --> 8 time units
processing_time = 2
max_time = number_of_tasks * 4 + 100
breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}
valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_periods = [[0 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_start_times = [item for sublist in valid_start_times for item in sublist]
model = cp_model.CpModel()
#model.new_int_varFromDomain(cp_model.Domain.from_intervals([[1, 2], [4, 6]]), 'x')
var_task_starts = {task: model.new_int_varFromDomain(
cp_model.Domain.from_intervals(valid_periods),
f"task_{task}_start")
for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time,
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] < var_task_starts[task])
all_intervals = [x for x in var_task_intervals.values()]
model.add_no_overlap(all_intervals)
# obj
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# for task in tasks:
# print(f'Task {task} ',
# solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
# )
#
# print('Make-span:', solver.value(make_span))
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
N = 200
sizes_1 = list(range(2, N, 10))
sizes_2 = list(range(2, N, 10))
model_1_times = []
model_2_times = []
print("\nNoOverlap\n")
for size in tqdm(sizes_1):
model_1_times.append(run_model_1(size))
print("\nLinear Domain\n")
for size in tqdm(sizes_2):
model_2_times.append(run_model_2(size))
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(sizes_1, model_1_times, marker='o', label='NoOverLapping')
plt.plot(sizes_2, model_2_times, '-.', marker='o', label='Linear Domain')
plt.legend()
plt.title('Performance benchmarking')
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
Conditional duration via linear domain
Source: scheduling/example_33_conditional_duration_linear_domain.py
Combines the ideas of two earlier chapters. Like Task delaying a break, a task that hits a break takes longer. Like Linear domain for breaks, the break pattern is periodic enough to encode as start-time domains.
Two start domains are precomputed: domain_break (starts that would
overlap a break) and domain_no_break (safe starts). A reified bool
overlap_break[t] toggles which domain the start belongs to, and the
duration switches by one time unit accordingly:
model.add(duration[t] == processing_time + 1).only_enforce_if(overlap_break[t])
model.add(duration[t] == processing_time ).only_enforce_if(~overlap_break[t])
Because the task interval is built from start/duration/end, everything downstream - no-overlap, cumulative, makespan - sees the correct effective duration with no extra bookkeeping.
Concepts
- Breaks (conditional duration)
- CP-SAT basics (
add_linear_expression_in_domain) - Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
break_offset = 1
num_of_tasks = 3
max_time = num_of_tasks*3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
# print("Heuristic 1: Apply the tasks sequence heuristics")
for task in tasks:
if task == 0:
continue
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
~var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time).only_enforce_if(
~var_task_overlap_break[task]
)
# intervals
var_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_durations[task],
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
model.add_no_overlap(var_intervals.values())
# Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span + sum(var_task_durations.values()))
# model.minimize(sum(var_task_durations))
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print_result = True
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_overlap_break[task]),
solver.value(var_task_durations[task]),
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Shift crossing (fake time unit)
Source: scheduling/example_16_shift_crossing_fake_time_unit.py
The working day is split into shifts. A task is allowed to run entirely inside one shift, but not straddle a boundary.
The first approach borrows the break mechanic. Place a one-unit "fake break" interval at each shift boundary, then forbid every task from overlapping it:
for s, e in synthetic_shift_breaks:
br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
for t in tasks:
model.add_no_overlap([task_interval[t], br])
Concise and scales well. Real breaks continue to use add_cumulative.
Compare with the explicit shift-assignment
alternative next.
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
# 1. Data
synthetic_shift_breaks = {(4, 5), (9, 10)}
breaks = {(0, 2)}
tasks = {1}
processing_time = {1: 3}
max_time = 10
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task], processing_time[task], var_task_ends[task], name=f"interval_t{task}"
) for task in tasks
}
# Add break time
var_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# THE CONSTRAINT for synthetic shift break time unit
var_shift_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in synthetic_shift_breaks
}
for start, end in synthetic_shift_breaks:
print(start, end)
for task in tasks:
model.add_no_overlap([var_task_intervals[task], var_shift_break_intervals[start, end]])
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Shift crossing (Mathieu)
Source: scheduling/example_17_shift_crossing_mathieu.py
Second take on the same shift problem. Instead of a synthetic break at
every boundary, assign each task explicitly to one shift via a one-hot
presence[shift, task]. When that presence is true, the task must stay
inside the shift window.
for t in tasks:
model.add_exactly_one(presence[s, t] for s in shifts)
for s in shifts:
model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
model.add(end[t] <= shift_end[s] ).only_enforce_if(presence[s, t])
More variables than the synthetic-break approach, but the assignment is visible in the solution and easy to extend - per-shift headcount caps, operator preferences, shift-specific processing times all drop in naturally.
Concepts
- Shifts (explicit shift assignment)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
tasks = {1}
processing_times = {1: 3}
max_time = 10
breaks = {(0, 2)}
shifts = {1, 2}
shift_starts = {1:0, 2:4}
shift_ends = {1:4, 2:8}
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_shift_task_presence = {
(shift, task): model.new_bool_var(f'task_{task}_is_in_shift_{shift}')
for shift in shifts for task in tasks
}
for task in tasks:
model.add(sum(var_shift_task_presence[shift, task] for shift in shifts) == 1 )
for shift in shifts:
model.add(var_task_starts[task] >= shift_starts[shift]).only_enforce_if(
var_shift_task_presence[shift, task]
)
model.add(var_task_ends[task] <= shift_ends[shift]).only_enforce_if(
var_shift_task_presence[shift, task]
)
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task], processing_times[task], var_task_ends[task], name=f"interval_t{task}"
) for task in tasks
}
# Add break time
var_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Stages, one job
Source: scheduling/example_21_stages_one_job.py
A job can consist of ordered stages. Think of a bakery batch: prepare, bake, pack. Each stage uses its own equipment and must follow the previous one.
Model: tasks are indexed (job, stage). The job's start and end are the
min and max over its stage starts and ends:
model.add_min_equality(job_start[job], [start[job, s] for s in stages])
model.add_max_equality(job_end[job], [end [job, s] for s in stages])
Stage precedence is a chain of end[s] <= start[s + 1]. With only one
job, the per-stage add_no_overlap is a no-op here, but the scaffolding
is ready for the next chapter.
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 10
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Stages, two jobs
Source: scheduling/example_22_stages_two_jobs.py
Add a second job and the previous chapter's scaffolding earns its keep. Each stage has only one instance of its equipment, so two jobs cannot sit in the same stage at the same time:
for s in stages:
model.add_no_overlap([intervals[j, s] for j in jobs])
This is the classic flow-shop pattern. The solver now has to interleave the two jobs across stages such that no stage is double-booked and the makespan is minimised.
Concepts
- Multi-stage jobs (stage-level no-overlap)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1, 2}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 20
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
for stage in stages:
model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Multistage, two jobs
Source: scheduling/example_23_multistage_two_jobs_co.py
Same flow-shop model scaled to 6 jobs over 3 stages. No new mechanics - stage precedence, per-stage no-overlap, makespan objective - just more of everything. It exists to check how CP-SAT handles a realistic-sized flow-shop instance.
Despite the co suffix in the filename, no explicit changeover is
added; treat it as a scalability exercise.
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1, 2, 3, 4, 5, 6}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 100
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
for stage in stages:
model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
People mode
Source: scheduling/example_10_people_mode.py
Not every task has a fixed resource profile. A common pattern: you can run a task with 2 people for 3 time units, or with 3 people for 2. Different modes, different (duration, headcount) pairs, same outcome.
The solver needs to pick the mode. A one-hot mode[t, k] per task drives
the derivation:
model.add(
proc_time[t] == sum(
processing_time[product[t], k] * mode[t, k] for k in modes
)
)
With proc_time[t] now a variable rather than a constant, the rest of
the sequencing machinery (machine presence, per-machine add_circuit,
task-level links) carries over unchanged.
Concepts
- Resources and cumulative (resource modes)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
'''
### 1. Data
tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'A'}
machines = {1,2}
resource_modes = {1, 2}
# product -> people mode -> time duration
processing_time = {
('A', 1): 3,
('A', 2): 2
}
resource_requirement = {
('A', 1): 2,
('A', 2): 3
}
max_time = 20
# (task, people_mode)
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
# One task to one machine.
for task in tasks:
task_candidate_machines = machines
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for sequence control
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for t1 in tasks
for t2 in tasks
for m in machines
if t1 != t2
}
# Sequence
for m in machines:
arcs = list()
for node1, t1 in enumerate(tasks):
tmp_1 = model.new_bool_var(f'm_{m}_first_to_{t1}')
arcs.append([0, t1, tmp_1])
tmp_2 = model.new_bool_var(f'm_{m}_{t1}_to_last')
arcs.append([t1, 0, tmp_2])
arcs.append([t1, t1, ~variables_machine_task_presences[m, t1]])
for node_2, t2 in enumerate(tasks):
# arcs
if t1 == t2:
continue
arcs.append([
t1,
t2,
variables_machine_task_sequence[(m, t1, t2)]
])
distance = 0
# cannot require the time index of task 0 to represent the first and the last position
model.add(
variables_task_ends[t1] + distance <= variables_task_starts[t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add_circuit(arcs)
#
variables_task_resource_mode = {
(task, resource_mode): model.new_bool_var(f"task {task} using resource mode {resource_mode}")
for task in tasks for resource_mode in resource_modes
}
# variables_task_resource_modes = {
# task: model.new_int_var(0, len(resource_modes), f"the resource mode of {task}")
# for task in tasks
# }
# one task has to pick one resource mode
for task in tasks:
tmp = sum(variables_task_resource_mode[task, resource_mode] for resource_mode in resource_modes)
model.add(tmp == 1)
# link resource id with decision matrix
# for task in tasks:
# for resource_mode in resource_modes:
# model.add(
# variables_task_resource_modes[task] == resource_mode
# ).only_enforce_if(
# variables_task_resource_mode_matrix[task, resource_mode]
# )
variables_task_processing_time = {
task: model.new_int_var(0, 99, f"processing time for {task}")
for task in tasks
}
for task in tasks:
model.add(
variables_task_processing_time[task] == sum(
processing_time[task_to_product[task], resource_mode]*variables_task_resource_mode[task, resource_mode]
for resource_mode in resource_modes
)
)
# model.new_optional_interval_var(1,1,2,1,'')
# model.new_optional_interval_var(
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# '')
# variables_machine_task_intervals = {
# (m, task): model.new_optional_interval_var(
# variables_machine_task_starts[m, task],
# #2,
# variables_task_processing_time[task],
# #processing_time[task_to_product[task], variables_task_resource_modes[task]],
# variables_machine_task_ends[m, task],
# variables_machine_task_presences[m, task],
# name=f"t_interval_{m}_{task}"
# )
# for task in tasks
# for m in machines
# }
variables_machine_task_resource_mode_presence = {
(m, task, resource_mode): model.new_bool_var(f'm{m}_task_{task}_resource_mode_{resource_mode}_presence')
for m in machines
for task in tasks
for resource_mode in resource_modes
}
for task in tasks:
for resource_mode in resource_modes:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for m in machines) == 1
).only_enforce_if(
variables_task_resource_mode[task, resource_mode]
)
for task in tasks:
for m in machines:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for resource_mode in resource_modes) == 1
).only_enforce_if(
variables_machine_task_presences[m, task]
)
for task in tasks:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode]
for resource_mode in resource_modes
for m in machines
) == 1
)
variables_machine_task_mode_intervals = {
(m, task, resource_mode): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
#2,
variables_task_processing_time[task],
#processing_time[task_to_product[task], variables_task_resource_modes[task]],
variables_machine_task_ends[m, task],
variables_machine_task_resource_mode_presence[m, task, resource_mode],
#variables_machine_task_presences[m, task]*variables_task_resource_mode[task, resource_mode],
name=f"int_m{m}_t{task}_mode{resource_mode}"
)
for task in tasks
for m in machines
for resource_mode in resource_modes
}
intervals = [x for x in variables_machine_task_mode_intervals.values()]
demands = [
resource_requirement[task_to_product[task], resource_mode]
for machine, task, resource_mode in variables_machine_task_mode_intervals.keys()
]
model.add_cumulative(intervals=intervals, demands=demands, capacity=7)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# for m in machines:
# for task in tasks:
# for resource_mode in resource_modes:
# print(f'{m} {task} {resource_mode}')
# print(solver.value(variables_machine_task_mode_intervals[m, task, resource_mode]))
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for m in machines:
for task in tasks:
for resource_mode in resource_modes:
print(f'M {m} T {task} R {resource_mode}' ,end = ' ')
print(f' {solver.value(variables_machine_task_resource_mode_presence[m, task, resource_mode])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task]), end=' '
)
for resource_mode in resource_modes:
value = solver.value(variables_task_resource_mode[task, resource_mode])
if value == 1:
print(f'Using resource mode {resource_mode}')
print('Make-span:', solver.value(make_span))
print('======================= ALLOCATION & SEQUENCE =======================')
for m in machines:
print(f'------------\nMachine {m}')
for t1 in tasks:
value = solver.value(variables_machine_task_presences[(m, t1)])
if value == 1:
print(f'task_{t1}_on machine_{m}')
for t2 in tasks:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Resource mode (empty)
Source: scheduling/example_11_resource_mode.py
The file is empty. It was likely intended as a twin of the previous chapter, modeling a non-human resource (machine speed, tool type) with the same one-hot mode pattern. See People mode for the fully worked version.
Headcount tracking
Source: scheduling/example_34_headcount_tracking.py
The big-picture chapter on resources. add_cumulative works when each
interval's demand is a known constant. But if the demand depends on
state - the task's mode, whether it overlaps a break, which operator is
assigned - you need something more flexible.
Three approaches are benchmarked:
- Method 0 - native
add_cumulative. Fastest when it applies, but assumes fixed durations and demands. - Method 1 - per-slot start-presence booleans.
var_task_starts_presence[t, i]fires when tasktstarts at sloti. A "did a task starting within the lastdslots cover this slot?" indicator (built withadd_max_equality) lets the duration vary per task. Per-time resource variables are the sum. - Method 2 - per-slot overlap booleans. The same information
encoded via
start <= t < endbooleans for every task-slot pair.
Methods 1 and 2 trade model size for flexibility; the chapter also
ships a small Variables dataclass and extract_solution(...) helper
that keeps the boilerplate manageable.
Concepts
- Resources and cumulative (time-varying demand)
- Breaks (state-dependent duration)
- Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Set
import copy
import logging
logger = logging.getLogger(__name__)
@dataclass
class Variables:
"""
All variables of the optimization problem, used for decisions and result values
Type `Any` shorthand for a solver variable object, or a result float/int
"""
var_task_starts: Dict[str, any] = None
var_task_ends: Dict[str, any] = None
var_task_durations: Dict[str, any] = None
var_task_overlap_break: Dict[str, any] = None
var_intervals: Dict[str, any] = None
total_duration: Dict[str, any] = None
make_span: Dict[int, any] = None
def extract_solution(solver: cp_model.CpSolver, variables):
"""Extracts the solution from the variables
Three kinds of variables have been considered
Integer, Boolean and Interval Variable
Incase of other types of variables,
this function would require modification
"""
# Initializing solution class
solution = copy.deepcopy(variables)
for varname, vardict in variables.__dict__.items():
setattr(solution, varname, None)
# Assigning variable values
for varname, vardict in variables.__dict__.items():
if vardict is not None:
setattr(
solution,
varname,
{
k: solver.value(v) if type(v) not in [cp_model.IntervalVar] else v
for k, v in vardict.items()
},
)
else:
logger.warning(f"Variable '{varname}' is defined but not used in model")
return solution
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
variables = Variables()
break_offset = 0
num_of_tasks = 1
max_time = num_of_tasks * 3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i * num_of_tasks + break_offset, i * num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x * 3 + break_offset + 1 for x in range(-1, num_of_tasks) if x * 3 + break_offset + 1 >= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_values([x for x in starts_no_break])
domain_break = cp_model.Domain.from_values([x for x in starts_break])
model = cp_model.CpModel()
variables.var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
variables.var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
variables.var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
variables.var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
for task in tasks:
if task == 0:
continue
model.add(variables.var_task_ends[task - 1] <= variables.var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_break).only_enforce_if(
variables.var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_no_break).only_enforce_if(
variables.~var_task_overlap_break[task]
)
# model.add(variables.var_task_durations[task] == processing_time + variables.var_task_overlap_break[task] * 1)
model.add(variables.var_task_durations[task] == processing_time + 1).only_enforce_if(
variables.var_task_overlap_break[task]
)
model.add(variables.var_task_durations[task] == processing_time).only_enforce_if(
variables.~var_task_overlap_break[task]
)
# intervals
variables.var_intervals = {
task: model.new_interval_var(
variables.var_task_starts[task],
variables.var_task_durations[task],
variables.var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
tracking_method = 1
if tracking_method == 0:
# native cumulative
variables.resource_needs = {'all': model.new_int_var(0, 10, 'resource_needs')}
resource_needs = {task: 1 for task in tasks}
intervals, headcounts = [], []
# 2.1 Add tasks requirements
# 2.1.1 Add demand-based intervals
for task in tasks:
intervals.append(variables.var_intervals[task])
headcounts.append(resource_needs[task])
# 2.2 Create cumulative constraints
model.add_cumulative(intervals, headcounts, variables.resource_needs['all'])
elif tracking_method == 1:
# cumulative_with_start_time
max_r = 10
lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]
times_min = min(lb)
times_max = max(ub)
time_range = range(times_min, times_max + 1)
variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
for t in time_range
}
variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in time_range
}
variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in time_range
for duration in [2, 3]
}
variables.var_task_starts_presence = {
(task, t): model.new_bool_var(f'start_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
for task in tasks:
model.add_exactly_one([variables.var_task_starts_presence[task, t] for t in time_range])
# Define min & max duration
min_duration = variables.var_task_durations[task].proto().domain[0]
max_duration = variables.var_task_durations[task].proto().domain[1]
for t in time_range:
# Capture start time
model.add(variables.var_task_starts[task] == t).only_enforce_if(
variables.var_task_starts_presence[task, t])
# Capture based on duration
for duration in [min_duration, max_duration]:
model.add_max_equality(variables.duration_task_resource_needs[task, t, duration],
[variables.var_task_starts_presence[task, t_start]
for t_start in range(t - duration + 1, t + 1)
if (task, t_start) in variables.var_task_starts_presence
]
)
# Capture the actual task duration and the needs
model.add(variables.task_resource_needs[task, t]
== variables.duration_task_resource_needs[task, t, 2]
).only_enforce_if(variables.~var_task_overlap_break[task])
model.add(variables.task_resource_needs[task, t]
== variables.duration_task_resource_needs[task, t, 3]
).only_enforce_if(variables.var_task_overlap_break[task])
# Capture total resource needs
for t in range(times_min, times_max + 1):
model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
for task in tasks
]
))
elif tracking_method == 2:
"""
Cumulative with overlap
"""
max_r = 10
lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]
times_min = min(lb)
times_max = max(ub)
resource_needs = {task: 1 for task in tasks}
variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
for t in range(times_min, times_max + 1)
}
variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
for duration in [2, 3]
}
variables.var_task_starts_presence = {
(task, t): model.new_bool_var(f'start_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
for task in tasks:
for t in range(times_min, times_max + 1):
# s[i] < t
b1 = model.new_bool_var("")
model.add(variables.var_task_starts[task] <= t).only_enforce_if(b1)
model.add(variables.var_task_starts[task] > t).only_enforce_if(~b1)
# t < e[i]
b2 = model.new_bool_var("")
model.add(t < variables.var_task_ends[task]).only_enforce_if(b2)
model.add(t >= variables.var_task_ends[task]).only_enforce_if(~b2)
# b1 and b2 (b1 * b2)
b3 = model.new_bool_var("")
model.add_bool_and([b1, b2]).only_enforce_if(b3)
model.add_bool_or([~b1, ~b2]).only_enforce_if(~b3)
# b1 * b2 * r[i]
model.add_multiplication_equality(variables.task_resource_needs[task, t], [b3, resource_needs[task]])
# Capture total resource needs
for t in range(times_min, times_max + 1):
model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
for task in tasks
]
))
# Objectives
variables.make_span = {0: model.new_int_var(0, max_time, "make_span")}
model.add_max_equality(
variables.make_span[0],
[variables.var_task_ends[task] for task in tasks]
)
variables.total_duration = {0: model.new_int_var(0, max_time, "make_span")}
model.add(variables.total_duration[0] == sum(variables.var_task_durations.values()))
model.minimize(variables.make_span[0] + variables.total_duration[0])
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print("total time:", total_time)
print_result = True
solution: Variables = extract_solution(solver, variables)
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solution.var_task_starts[task],
solution.var_task_ends[task],
solution.var_task_overlap_break[task],
solution.var_task_durations[task],
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
if tracking_method == 0:
print('Using method: cumulative_with_built_in_feature')
print('resource_needs', solution.resource_needs)
elif tracking_method == 1:
print('Using method: cumulative_with_start_time')
print('resource_needs', solution.resource_needs)
print('task_resource_needs', solution.task_resource_needs)
print('duration_task_resource_needs', solution.duration_task_resource_needs)
print('var_task_starts_presence', solution.var_task_starts_presence)
elif tracking_method == 2:
print('Using method: cumulative_with_overlap')
print('resource_needs', solution.resource_needs)
print('task_resource_needs', solution.task_resource_needs)
print('duration_task_resource_needs', solution.duration_task_resource_needs)
print('var_task_starts_presence', solution.var_task_starts_presence)
print('Make-span:', solution.make_span[0])
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Max number of continuous tasks
Source: scheduling/example_09_max_number_of_continuous_tasks.py
First campaigning chapter. In many process industries, tasks of the same
product can run back-to-back with no changeover, but only up to N in a
row - the equipment needs a cleaning cycle after that, even between two
batches of the same product.
This model takes the most direct approach: campaigns are first-class
entities. For every product, enumerate as many candidate campaigns as
there are tasks of that product. Each campaign has start, end,
duration, presence; a task is assigned to exactly one campaign via
var_task_campaign_presences[t, c]; a campaign is present iff at least
one task is assigned (add_max_equality); campaign duration bounded by
max_conti_task_num caps the size.
Campaigns (not tasks) are then sequenced with add_circuit, with a
changeover distance between consecutive campaigns. Expected pattern for
A A A B is A A -> CO -> A -> CO -> B.
This is the "business view" of campaigning. A more compact alternative follows in Campaigning with cumul.
Concepts
- Campaigning (approach 1: campaigns as entities)
- Circuit and sequencing
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
products = {'A', 'B'}
task_to_product = {1: 'A', 2: 'A', 3: 'A', 4: 'B'}
processing_time = {'A': 1, 'B': 1}
changeover_time = 2
max_conti_task_num = 2
max_time = 100
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] else changeover_time
for t1 in tasks for t2 in tasks
if t1 != t2
}
# Campaign related data pre=processing
max_product_campaigns = {
product: len([task for task in tasks if task_to_product[task]==product]) for product in products
}
product_campaigns = {
(product, f"{product}_{campaign}")
for product in max_product_campaigns
for campaign in list(range(0, max_product_campaigns[product]))
#if max_product_campaigns[product] > 0
}
campaign_to_product = {
campaign: product for product, campaign in product_campaigns
}
campaigns = {campaign for product, campaign in product_campaigns}
product_to_campaigns = {
product: [c for c in campaigns if campaign_to_product[c] == product] for product in products
}
task_to_campaigns = {
task: [
campaign for campaign in campaigns if campaign_to_product[campaign] == task_to_product[task]
]
for task in tasks
}
campaign_size = {campaign: max_conti_task_num for campaign in campaigns}
campaign_duration = {campaign: max_conti_task_num for campaign in campaigns}
campaign_to_tasks = {
campaign:
[
task for task in tasks if campaign in task_to_campaigns[task]
]
for campaign in campaigns
}
m_cost_campaign = {
(c1, c2): 0
if campaign_to_product[c1] == campaign_to_product[c2] else changeover_time
for c1 in campaigns for c2 in campaigns
if c1 != c2
}
# Need changeover if we switch from a product to a different product
# We can continue to do tasks of the same product type but there is
# a max number of continuous production of tasks with the same product type
# For A A A, we expect A -> A -> Changeover -> A
# For A A A A A, we expect A -> A -> Changeover -> A A -> Changeover -> A
# For A B, we expect A -> Changeover -> B
# For A A B, we expect A -> A -> Changeover -> B
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
# variables_task_sequence = {
# (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
#
# variables_co_starts = {
# (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
#
# variables_co_ends = {
# (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Task Duration
# for task in tasks:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
var_task_intervals = {
t: model.new_interval_var(
variables_task_starts[t],
1,
variables_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
# Sequence
# arcs = list()
# for t1 in tasks:
# for t2 in tasks:
# # arcs
# if t1 != t2:
# arcs.append([
# t1,
# t2,
# variables_task_sequence[(t1, t2)]
# ])
# distance = m_cost[t1, t2]
# # cannot require the time index of task 0 to represent the first and the last position
# if t2 != 0:
# # to schedule tasks and c/o
# model.add(
# variables_task_ends[t1] <= variables_co_starts[t1, t2]
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add(
# variables_co_ends[t1, t2] <= variables_task_starts[t2]
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add(
# variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add_circuit(arcs)
# Campaigning related
var_campaign_starts = {
c: model.new_int_var(0, max_time, f"start_{c}") for c in campaigns
}
var_campaign_ends = {
c: model.new_int_var(0, max_time, f"c_end_{c}") for c in campaigns
}
var_campaign_durations = {
c: model.new_int_var(0, max_time, f"c_duration_{c}") for c in campaigns
}
var_campaign_presences = {
c: model.new_bool_var(f"c_presence_{c}") for c in campaigns
}
# Task Duration
# for c in campaigns:
# model.add(
# var_campaign_ends[c] - var_campaign_starts[c] == var_campaign_durations[c]
# )
# var_campaign_intervals = {
# c: model.new_optional_interval_var(
# var_campaign_starts[c], # campaign start
# var_campaign_durations[c], # campaign duration
# var_campaign_ends[c], # campaign end
# var_campaign_presences[c], # campaign presence
# f"c_interval_{c}",
# )
# for c in campaigns
# }
# If a task in allocated to a campaign
var_task_campaign_presences = {
(t, c): model.new_bool_var(f"task_{t}_presence_in_campaign_{c}") for t in tasks for c in task_to_campaigns[t]
}
# add_campaign_definition_constraints
# 1. Campaign definition: Start & duration based on tasks that belongs to the campaign
for c in campaigns:
# 1. Duration definition
model.add(
var_campaign_durations[c] == sum(
processing_time[task_to_product[t]] * var_task_campaign_presences[t, c]
for t in campaign_to_tasks[c]
)
)
# 2. Start-end definition
# TODO: MinEquality ?
for t in campaign_to_tasks[c]:
# var_campaign_starts
# var_campaign_ends
model.add(var_campaign_starts[c] <= variables_task_starts[t]).only_enforce_if(
var_task_campaign_presences[t, c]
)
model.add(variables_task_ends[t] <= var_campaign_ends[c]).only_enforce_if(
var_task_campaign_presences[t, c]
)
# 3. Link c & tc presence: If 1 task is scheduled on a campaign -> presence[t, c] = 1 ==> presence[c] == 1
# as long as there is one task in a campaign, this campaign must be present
model.add_max_equality(
var_campaign_presences[c], [var_task_campaign_presences[t, c] for t in campaign_to_tasks[c]]
)
# 2. Definition of the bool var: if a task belongs to a campaign
for task in tasks:
# One task belongs to at most 1 campaign
# task_to_campaigns[task]
model.add(
sum(var_task_campaign_presences[task, campaign]
for campaign in campaigns
if campaign in task_to_campaigns[task]
) == 1
)
# 3. No campaign overlap
# Campaigns won't overlap anyway. But we need this for tasks within campaigns
model.add_no_overlap([x for x in var_task_intervals.values()])
for c in campaigns:
model.add(
var_campaign_durations[c] <= 2
)
# Add campaign circuit
arcs = []
var_campaign_sequence = {}
for node_1, campaign_1 in enumerate(campaigns):
tmp1 = model.new_bool_var(f'first_to_{campaign_1}')
arcs.append([
0, node_1 + 1, tmp1
])
tmp2 = model.new_bool_var(f'{campaign_1}_to last')
arcs.append([
node_1 + 1, 0, tmp2
])
arcs.append([node_1 + 1, node_1 + 1, ~var_campaign_presences[campaign_1]])
# for outputting
var_campaign_sequence.update({(0, campaign_1): tmp1})
var_campaign_sequence.update({(campaign_1, 0): tmp2})
for node_2, campaign_2 in enumerate(campaigns):
if node_1 == node_2:
continue
indicator_node_1_to_node_2 = model.new_bool_var(f'{campaign_1}_to_{campaign_2}')
var_campaign_sequence.update({(campaign_1, campaign_2): indicator_node_1_to_node_2})
distance = m_cost_campaign[campaign_1, campaign_2]
arcs.append([node_1 + 1, node_2 + 1, indicator_node_1_to_node_2])
model.add(
var_campaign_ends[campaign_1] + distance <= var_campaign_starts[campaign_2]
).only_enforce_if(
indicator_node_1_to_node_2
).only_enforce_if(
var_campaign_presences[campaign_1]
).only_enforce_if(
var_campaign_presences[campaign_2]
)
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
# for t1 in tasks:
# for t2 in tasks:
# if t1 != t2:
# value = solver.value(var[t1, t2])
# if value == 1 and t2 != 0:
# print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[t1, t2]}')
# print('variables_task_sequence[t1, t2]',
# solver.value(variables_task_sequence[t1, t2]))
# print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
# print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
#
# if value == 1 and t2 == 0:
# print(f'{t1} --> {t2} Closing')
for c1 in list(campaigns) + [0]:
for c2 in list(campaigns) + [0]:
if c1 == c2:
continue
value = solver.value(var_campaign_sequence[c1, c2])
if value == 1 and c2 != 0 and c1 != 0:
c1_tasks = []
c2_tasks = []
for task in campaign_to_tasks[c1]:
if solver.value(var_task_campaign_presences[task, c1]) == 1:
c1_tasks.append(task)
for task in campaign_to_tasks[c2]:
if solver.value(var_task_campaign_presences[task, c2]) == 1:
c2_tasks.append(task)
print(f'{c1} --> {c2} {campaign_to_product[c1]} {c1_tasks} >> {campaign_to_product[c2]} {c2_tasks} cost: {m_cost_campaign[c1, c2]}')
if value == 1 and c1 == 0:
print(f'{c1} --> {c2} Starting')
if value == 1 and c2 == 0:
print(f'{c1} --> {c2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Campaigning time constraint (empty)
Source: scheduling/example_18_campaigning_time_constraint.py
Empty file. Likely intended to cap campaigns by total duration (rather than task count). For the closest working idea, see Campaigning with cumul, which caps by rank; replacing the rank limit with a duration sum is the natural variant.
Cleaning holding time (empty)
Source: scheduling/example_19_cleaning_holding_time.py
Empty file. Likely intended to model "cleaning events with a minimum or
maximum holding time" between tasks - a changeover interval that must
sit inside a specific window. See
Changeover as event; adding an
extra min_hold <= co_duration <= max_hold bound gives you the idea.
Campaigning with cumul
Source: scheduling/example_24_campaigning_with_cumul.py
The campaigns-as-entities
model works, but it creates a lot of variables. This chapter introduces
the compact alternative: keep tasks as the atomic unit and attach a
rank variable cumul[t] in [0, campaign_size - 1].
On each t1 -> t2 arc the model branches on reach_max[t1]:
- Campaign continues (
reach_max[t1]false):end[t1] <= start[t2]andcumul[t2] == cumul[t1] + 1. - Campaign ends (
reach_max[t1]true): a changeover is inserted andcumul[t2] = 0resets the rank.
The circuit uses -1 as the first/last dummy node. The __main__ block
benchmarks solve time at campaign sizes 2 and 3, showing the approach
scales better than the entity model.
Concepts
- Campaigning (approach 2: cumulative rank)
- Circuit and sequencing
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result = True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# if from task has not reached MAX, continue the campaign
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
# if from task has reached MAX, apply changeover and reset its cumulative indicator
model.add(var_task_cumul[t2] == 0).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
return total_time
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 8
sizes = range(2, N+1)
model_times_campaign_2 = []
model_times_campaign_3 = []
for num_task in sizes:
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))
print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence
Source: scheduling/example_25_campaigning_with_locked_seq.py
Same cumulative-rank campaigning, with a single heuristic bolted on: lock the task order.
for task in tasks:
if task != 0:
model.add(var_task_ends[task - 1] <= var_task_starts[task])
In practice, task indices often already reflect priority or deadline order. Telling the solver so upfront prunes a lot of symmetric branches without changing the optimum. The speed-up can be an order of magnitude.
Concepts
- Campaigning (task-order heuristic)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result = True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
for task in tasks:
if task != 0:
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# if from task has not reached MAX, continue the campaign
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
# if from task has reached MAX, apply changeover and reset its cumulative indicator
model.add(var_task_cumul[t2] == 0).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
return total_time
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 25
sizes = range(2, N+1)
model_times_campaign_2 = []
model_times_campaign_3 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))
print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence improved
Source: scheduling/example_26_campaigning_locked_seq_improved.py
The previous chapter forced a campaign to end only when the rank hit the cap. Sometimes ending earlier gives a better schedule - for example when a near-deadline task of a different product is waiting. This chapter lets the solver decide.
The "force reach_max when cumul hits cap" implication is dropped. The
order lock loosens to start[t-1] <= start[t] (from the stricter
end <= start), which is compatible with flexible campaign ends.
The chapter also introduces a recurring trick: since add_max_equality
cannot be wrapped in only_enforce_if, compute the next rank outside
the literal and then assign it conditionally.
model.add_max_equality(
max_values[t1, t2],
[0, cumul[t1] + 1 - reach_end[t1] * campaign_size],
)
model.add(cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])
Concepts
- Campaigning (flexible campaign ends)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result=True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
# A relative later task shall not start earlier than a relative earlier task
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] <= var_task_starts[task])
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# # if from task has not reached MAX, continue the campaign
# model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(~var_task_reach_max[t1])
# model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(~var_task_reach_max[t1])
#
# # if from task has reached MAX, apply changeover and reset its cumulative indicator
# model.add(var_task_cumul[t2] == 0).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(var_task_reach_max[t1])
# model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(var_task_reach_max[t1])
model.add(
var_task_ends[t1] + var_task_reach_max[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
model.add(
var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
).only_enforce_if(
literals[t1, t2]
)
# The following makes the model invalid
# model.add_max_equality(
# var_task_cumul[t2],
# [
# 0,
# var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
# ]
# ).only_enforce_if(
# literals[t1, t2]
# )
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, label='Campaign size: 2')
plt.plot(x, y2, label='Campaign size: 5')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 60
sizes = range(2, N+1, 4)
model_times_campaign_2 = []
model_times_campaign_5 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))
df = pd.DataFrame({
'num_tasks': sizes,
'c2': model_times_campaign_2,
'c5': model_times_campaign_5
})
df.to_csv("example_26_result.csv")
print_unit_test_result(sizes,
model_times_campaign_2,
model_times_campaign_5,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence improved (flexible)
Source: scheduling/example_26_campaigning_locked_seq_improved_flexible.py
Twin of the previous chapter. The two files exist side by side to document an important design choice explicitly: don't force the campaign to end just because the rank hit the cap. Let the solver decide.
Model structure is identical to its sibling; the comments here are the main difference. Read both to see the "before / after" of the refinement.
Concepts
- Campaigning (flexible campaign ends)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result=True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
# A relative later task shall not start earlier than a relative earlier task
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] <= var_task_starts[task])
# ! SHALL REMOVE THE FOLLOWING ! LEAVE THE DECISION (OF STARTING A NEW CAMPAIGN OR NOT) BACK TO THE MODEL !
# for task in tasks:
# model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
# model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
# This is for fixed campaigning size. DO full-campaign or NOT DO.
# model.add(
# var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
# ).only_enforce_if(literals[t1, t2])
# The creator has confirmed add_max_equality is not compatible with only_enforce_if
# model.add_max_equality(
# var_task_cumul[t2],
# [0,var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
# ).only_enforce_if(literals[t1, t2])
# ! HERE IS THE CHANGE RECOMMENDED FOR FLEXIBLE CAMPAIGNING. BUT IN TWO STEPS !
# NOTE var_reach_campaign_end ARE NOW OPEN BOOL DECISION VARIABLES.
#
# If reaching limit (var_task_cumul +1 is equal to campaign_size), the expression is max (0,0)
# the model must do C/O because var_task_cumul[t2] has to be 0 (a new campaign starts).
#
# If not yet reaching limit (var_task_cumul +1 < campaign_size), the expression is max(0, -x)
# model can still choose to do C/O by having var_reach_campaign_end to be 1 (if that brings a better obj)
model.add_max_equality(
max_values[t1, t2],
[0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
)
model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
solver.value(var_reach_campaign_end[task])
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, label='Campaign size: 2')
plt.plot(x, y2, label='Campaign size: 5')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
# print('Point check for run_model(40, 10)\nExpect make-span = 40*1 + (40/10 - 1)*2 = 40 + 6 = 46')
# run_model(40, 10)
N = 60
sizes = range(2, N+1, 4)
model_times_campaign_2 = []
model_times_campaign_5 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))
df = pd.DataFrame({
'num_tasks': sizes,
'c2': model_times_campaign_2,
'c5': model_times_campaign_5
})
print(df)
print_unit_test_result(sizes,
model_times_campaign_2,
model_times_campaign_5,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning across products
Source: scheduling/example_27_campaigning_products.py
So far every campaigning example has used one product. Real plants run many. A campaign must now end both when it hits the size cap and whenever the product changes.
Two extra booleans per task plug the product-change rule into the cumulative-rank model:
model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(literals[t1, t2])
model.add(var_reach_campaign_end[t1] >= var_product_change[t1])
The first line captures whether the outgoing arc crosses a product boundary; the second escalates any such crossing into a campaign end, which in turn forces a changeover via the familiar rank-reset machinery.
An optional per-product order lock (cumul[t-1] <= cumul[t]) speeds up
larger instances. The __main__ block checks the solver's makespan
against a closed-form expected value - a handy sanity test.
Concepts
- Campaigning (multi-product)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
import string
from math import ceil
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate tasks of products (no more than 26 products) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, print_result=True):
"""
Do changeovers if either of the following occurs:
1. Changeover between different products: [A] -> changeover -> [B]
2. Previous campaign reaching size limit: [A ... A] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 3
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*2
processing_time = 1
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print(f'\nInput data: {task_to_product}\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
for product_idx, product in enumerate(range(number_of_products)):
model.add(var_task_cumul[product_idx*num_of_tasks_per_product] == 0)
var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
var_product_change = {task: model.new_bool_var(f"task_{task}_go_to_different_product") for task in tasks}
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("Apply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLocking {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_cumul[_index-1] <= var_task_cumul[_index])
# Option 2: Locking the sequence of all tasks! This is quicker (0.10s for 3, 4, 4)
# print("Locking 0", end=" ")
# for task in tasks:
# if task != 0:
# print(f"<= {task}", end=" ")
# model.add(var_task_starts[task-1] <= var_task_starts[task])
print("\n")
var_task_intervals = {
t: model.new_interval_var(var_task_starts[t], processing_time, var_task_ends[t], f"task_{t}_interval")
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(
literals[t1, t2]
)
model.add(var_reach_campaign_end[t1] >= var_product_change[t1])
model.add(
var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
# allow flexible campaigning
model.add_max_equality(
max_values[t1, t2],
[0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
)
model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
if task%num_of_tasks_per_product == 0:
print('--------- product divider ---------\n')
print(f'Task {task} {task_to_product[task]}',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
solver.value(var_reach_campaign_end[task]),
solver.value(var_product_change[task]),
)
if solver.value(var_reach_campaign_end[task]):
print('-- campaign ends --\n')
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
_number_of_products = 3
_num_of_tasks_per_product = 4
_campaign_size = 4
n = ceil(_num_of_tasks_per_product/_campaign_size)
_make_span = (_num_of_tasks_per_product + 2*n)*_number_of_products-2
run_time = run_model(_number_of_products, _num_of_tasks_per_product, _campaign_size)
print(f"Runtime: {round(run_time, 2)}s")
print(f'\nExpected make-span if all right:'
f'\n = (_num_of_tasks_per_product + changeover_time*ceil(_num_of_tasks_per_product/_campaign_size))*'
f'_number_of_products - changeover_time '
f'\n = {_make_span}')
Campaigning products x machines
Source: scheduling/example_28_campaigning_products_machines.py
The last two layers of complexity combine here: multi-product
campaigning runs on multiple machines in parallel. Rank,
reach_campaign_end, and product_change booleans are all indexed by
(machine, task). Per-machine add_circuit with presence self-loops,
per-machine campaigning rules inside each arc.
This is the most structurally involved CP-SAT model in the book and a good reference for assembling the earlier pieces - presence booleans, per-machine circuits, cumulative-rank campaigning, product-change reification - into one coherent model.
Concepts
- Campaigning (multi-product, multi-machine)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
"""
Allocate to tasks to multiple machines
And do changeover if either of the following occurs:
1. Switch between different products: [A campaign] -> changeover -> [B campaign]
2. Previous campaign reaching the size limit: [A FULL campaign] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 2
# number_of_machines = 2
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*3
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product, '\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
model.add_decision_strategy(
var_m_t_product_change.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE
)
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("\nApply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLock {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_ends[_index-1] <= var_task_starts[_index])
print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
# if t1 > t2:
# model.add(literals[m, t1, t2] == 0)
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# show the result if getting the optimal one
if print_result:
if status == cp_model.OPTIMAL:
big_list = []
for m in machines:
for task in tasks:
if solver.value(var_machine_task_presences[m, task]):
tmp = [
f"machine {m}",
f"task {task}",
task_to_product[task],
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_machine_task_rank[m, task]),
solver.value(var_m_t_product_change[m, task]),
solver.value(var_m_t_reach_campaign_end[m, task])
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
df = df.sort_values(['machine', 'start'])
for m in machines:
print(f"\n======= Machine {m} =======")
df_tmp = df[df['machine']==f"machine {m}"]
print(df_tmp)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
# number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
args = 4, 4, 3, 3
runtime = run_model(*args)
print(f"Solving time: {round(runtime, 2)}s")
Campaigning with pregrouping (empty)
Source: scheduling/example_30_campaigning_with_pregrouping.py
Empty file. Likely intended to pre-group tasks into candidate campaigns outside the solver, then let CP-SAT only sequence the pre-built groups. The closest working realisation is Max number of continuous tasks; fixing which tasks go into which campaign before solving gives you the pregrouping variant.
Campaigning faster
Source: scheduling/example_31_campaigning_faster.py
Tuned companion to the multi-product, multi-machine chapter. Same problem, same model skeleton; what changes is which heuristics are on, how the constraints are ordered, and a few implementation shortcuts.
Treat this as the "after tuning" version: when the earlier model starts to struggle on larger instances, walk through the differences here to see which levers were pulled.
Concepts
- Campaigning (tuning)
- Solver techniques
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
"""
Allocate to tasks to multiple machines
And do changeover if either of the following occurs:
1. Switch between different products: [A campaign] -> changeover -> [B campaign]
2. Previous campaign reaching the size limit: [A FULL campaign] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 2
# number_of_machines = 2
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*3
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product)
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
model.add_decision_strategy(
var_m_t_product_change.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE
)
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("Apply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLock {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_ends[_index-1] <= var_task_starts[_index])
print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
# this accelerates code much
# if t1 > t2 and task_to_product[t1]==task_to_product[t2]:
# model.add(literals[m, t1, t2] == 0)
if t1 > t2:
model.add(literals[m, t1, t2] == 0)
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# show the result if getting the optimal one
if print_result:
if status == cp_model.OPTIMAL:
big_list = []
for m in machines:
for task in tasks:
if solver.value(var_machine_task_presences[m, task]):
tmp = [
f"machine {m}",
f"task {task}",
task_to_product[task],
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_machine_task_rank[m, task]),
solver.value(var_m_t_product_change[m, task]),
solver.value(var_m_t_reach_campaign_end[m, task])
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
df = df.sort_values(['machine', 'start'])
for m in machines:
print(f"\n======= Machine {m} =======")
df_tmp = df[df['machine']==f"machine {m}"]
print(df_tmp)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
# number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
args = 4, 4, 3, 3
runtime = run_model(*args)
print(f"Solving time: {round(runtime, 2)}s")
Solving by phases
Source: scheduling/example_32_solving_by_phases.py
Large campaigning models can be slow to find even their first feasible
solution. A pragmatic trick: build the full model once, solve it
repeatedly with a growing max_time, and carry each phase's solution
forward as hints to the next.
The chapter ships a small pair of helpers around model.proto():
def get_solutions(model, solver):
return {v.name: solver.response_proto().solution[i]
for i, v in enumerate(model.proto().variables)}
def add_hints(model, solution):
for i, v in enumerate(model.proto().variables):
if v.name in solution:
model.proto().solution_hint.vars.append(i)
model.proto().solution_hint.values.append(solution[v.name])
With model.clear_hints() between runs, the solver gets a clean warm
start each phase. For hard instances this is often the difference
between "no feasible answer in 10 minutes" and a smooth ramp from
trivial horizon to the full one.
Concepts
- Solver techniques (hints, phases)
- Campaigning
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import numpy as np
import string
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def create_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
model = cp_model.CpModel()
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*5
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product, '\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
# model.add_decision_strategy(
# var_m_t_product_change.values(),
# cp_model.CHOOSE_FIRST,
# cp_model.SELECT_MIN_VALUE
# )
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
# print("\nApply the tasks sequence heuristics")
# # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
# for product_idx, product in enumerate(range(number_of_products)):
# for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
# _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
# if task_id_in_product_group == 0:
# print(f"\nLock {_index}", end=" ")
# else:
# print(f" <= {_index}", end=" ")
# model.add(var_task_ends[_index-1] <= var_task_starts[_index])
# print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
return model
# This takes 16 seconds
model = create_model(5, 20, 3, 1)
# solver = cp_model.CpSolver()
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(status)
# print(solver.objective_value())
# #10
# print(total_time, status)
phases = [
{'phase_id': 0, 'max_time': 0.5},
{'phase_id': 1, 'max_time': 1},
{'phase_id': 2, 'max_time': 2},
{'phase_id': 3, 'max_time': 4},
{'phase_id': 4, 'max_time': 8}
]
phases = [
{'phase_id': 0, 'max_time': 10},
{'phase_id': 1, 'max_time': 10},
{'phase_id': 2, 'max_time': 10},
{'phase_id': 3, 'max_time': 10},
{'phase_id': 4, 'max_time': 10}
]
phases = [
{'phase_id': 0, 'max_time': 5},
{'phase_id': 1, 'max_time': 10},
{'phase_id': 2, 'max_time': 15},
{'phase_id': 3, 'max_time': 20},
{'phase_id': 4, 'max_time': 25}
]
n = 6
phases = [
{'phase_id': i, 'max_time': (i+1)*10} for i in range(n)
]
def get_solutions(model, solver):
vars_sol = {}
for i, var in enumerate(model.proto().variables):
value = solver.response_proto().solution[i]
vars_sol[var.name] = value
return vars_sol
def add_hints(model, solution):
for i, var in enumerate(model.proto().variables):
if var.name in solution:
model.proto().solution_hint.vars.append(i)
model.proto().solution_hint.values.append(solution[var.name])
# get_solutions(model, solver)
obj_list = []
for phase in phases:
phase_id, max_time = phase['phase_id'], phase['max_time']
print('----------------------------')
model.clear_hints()
if phase_id == 0:
solver = cp_model.CpSolver()
if phase_id > 0 and 'solution' in locals():
print("Add hints")
add_hints(model, solution)
print('number of variables in solution hints:', len(model.proto().solution_hint.vars))
#solver.parameters.keep_all_feasible_solutions_in_presolve = True
solver.parameters.max_time_in_seconds = max_time
start = time()
status = solver.solve(model=model)
print('number of solutions:', solver.response_proto().solution)
if status == 1 or status == 3:
print(f'error status : {status}')
break
if status == 0:
print(f'Cannot find a feasible solution in the given time {max_time}. status:{status}')
obj_list.append(np.nan)
continue
obj_value = solver.objective_value()
obj_list.append(obj_value)
solution = get_solutions(model, solver)
total_time = time() - start
print(f"phase_id: {phase_id}, max_time: {max_time}, status: {status}, obj: {obj_value}. total time: {round(total_time,1)}")
if status == 4:
print('======================================================')
print('Optimal Solution Achieved ! No need to continue')
break
print(obj_list)
times = [(i+1)*5 for i in range(n)]
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(times, obj_list, marker='o')
plt.legend()
plt.title(f'time vs obj')
plt.xlabel('running seconds')
plt.ylabel('obj')
plt.show()
# self.alg_data, self.model = solver.solve(
# alg_data=self.alg_data,
# previous_model=self.model,
# model_parameters=self.model_parameters,
# )
#
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
# value = solver.response_proto().solution[i]
# vars_sol[var.name] = value
#
#
#
# solver = cp_model.CpSolver()
# solver.parameters.max_time_in_seconds = 0.5
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# #model.ExportToFile("C:/Temp/xxxx.pb.txt")
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
# print(var)
# print(var.name)
# print(var.domain)
# print('-------------------')
# vars_sol[var.name] = var
#
# model.proto().solution_hint
# for i, var in enumerate(model.proto().variables):
# if var.name in vars_sol:
# model.proto().solution_hint.vars.append(i)
# model.proto().solution_hint.values.append(vars_sol[var.name])
# #
#
# print(model.ModelStats())
# @dataclass
# class PhaseParameters:
# """ Gather different input optimisation parameters that can be set for each phase. """
#
# obj_phase: str = None
# max_time_in_seconds: int = None
# solution_count_limit: int = None
# restart: bool = False
#
#
# phase_1 = PhaseParameters('phase_1', 10)
# phase_2 = PhaseParameters('phase_2', 10)
#
#
# show the result if getting the optimal one