Introduction
This book collects notes and examples of scheduling models built with OR-Tools CP-SAT. The focus is on job-shop style problems: sequencing tasks on machines, handling changeovers, respecting breaks and shifts, modeling resources, and grouping tasks into campaigns.
The book is split into two parts.
- Concepts walks through the core modeling ideas once. Read these first if CP-SAT or constraint programming is new to you.
- Examples indexes the Python files in the
scheduling/folder and links each one back to the concepts it demonstrates. Examples are numbered and build on each other, so reading them in order is usually easiest.
All code lives in the scheduling/ directory of the repo. Open a file in your
editor while reading the corresponding chapter to see the full model.
A minimal CP-SAT template
Almost every example follows the same five-step shape.
from ortools.sat.python import cp_model
# 1. Data
# ... sets, durations, changeover table, etc.
# 2. Decision variables
model = cp_model.CpModel()
# ... start/end/interval/bool vars
# 3. Objective
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [ends[t] for t in tasks])
model.minimize(make_span)
# 4. Constraints
# ... precedence, resources, circuits, ...
# 5. Solve and post-process
solver = cp_model.CpSolver()
status = solver.solve(model)
The rest of the book explains what goes into step 4.
CP-SAT basics
CP-SAT is a constraint-programming solver with integer and boolean variables. Before looking at scheduling, it helps to be comfortable with a few primitives.
Variables
x = model.new_int_var(0, 100, 'x') # integer in [0, 100]
b = model.new_bool_var('b') # boolean (integer in {0, 1})
Linear constraints
model.add(x + y == 10)
model.add(x <= y)
model.add(sum(bs) == 1) # exactly-one on a list of bools
model.add_exactly_one(bs) # same, more idiomatic
Boolean combinations
model.add_bool_or([a, b]) # a or b
model.add_bool_and([a, b]) # a and b
model.add_bool_xor([a, b]) # exactly one of a, b
Reification with only_enforce_if
A constraint can be conditioned on a boolean literal. The constraint is only active when the literal is true.
model.add(x >= 5).only_enforce_if(b)
model.add(x < 5).only_enforce_if(~b)
Chaining two only_enforce_if calls gives an "and" of conditions:
model.add(y == 1).only_enforce_if(b1).only_enforce_if(b2) # y == 1 iff b1 and b2
For "or", you normally introduce intermediate booleans or use
add_bool_or.
add_min_equality, add_max_equality, add_multiplication_equality
These express z = min(xs), z = max(xs), z = x * y (or the product of a
list). add_max_equality is how makespan is usually encoded:
model.add_max_equality(make_span, [ends[t] for t in tasks])
Domains
For "x belongs to a non-contiguous set of values" use
add_linear_expression_in_domain:
domain = cp_model.Domain.from_intervals([[0, 4], [11, 100]])
model.add_linear_expression_in_domain(x, domain)
See example_00_unit_tests.py for a collection of small snippets exercising
each of these.
Solve and read back
solver = cp_model.CpSolver()
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(solver.value(x))
Interval variables
An interval variable bundles three integer variables - start, size, end -
with the implicit constraint start + size == end. CP-SAT uses intervals to
reason efficiently about scheduling: add_no_overlap and add_cumulative both
expect intervals.
Three flavors
Regular interval
iv = model.new_interval_var(start, size, end, name="t1")
Replaces a manual model.add(end - start == size).
Optional interval
An interval that is only scheduled when a presence boolean is true. Essential when a task may or may not be assigned to a given machine.
iv = model.new_optional_interval_var(start, size, end, is_present, name="t1_on_m1")
If is_present is false, the interval disappears from add_no_overlap /
add_cumulative reasoning.
Fixed-size interval
Convenient for breaks, shift boundaries, and anything with a known position.
br = model.new_fixed_size_interval_var(start=2, size=1, name="break")
Typical use
intervals = {
(m, t): model.new_optional_interval_var(
starts[m, t],
processing_time[product_of(t)],
ends[m, t],
presence[m, t],
f"t{t}_on_m{m}",
)
for t in tasks for m in machines
}
for m in machines:
model.add_no_overlap([intervals[m, t] for t in tasks])
Examples that introduce intervals: example_05_seq_with_intervals.py (first
use), example_03_seq_scale_Mathieu.py (dramatic speed-up vs. manual duration
constraints).
Circuit and sequencing
To sequence tasks on a machine, you need both the order and the time
constraints that follow from it. CP-SAT's add_circuit is the standard tool.
What add_circuit does
add_circuit(arcs) takes a list of triples [i, j, literal]. It asserts that
the selected arcs (where literal == 1) form a single Hamiltonian circuit over
the referenced nodes. Self-arcs [i, i, literal] mean node i is skipped
when literal is true.
arcs = []
for t1 in tasks:
arcs.append([0, t1, start_literal(t1)]) # dummy -> t1 (first)
arcs.append([t1, 0, end_literal(t1)]) # t1 -> dummy (last)
arcs.append([t1, t1, ~presence[t1]]) # skip t1 if absent
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, seq[t1, t2]])
model.add_circuit(arcs)
Node 0 (or -1) is typically a dummy "first/last" node.
Linking the circuit to time
add_circuit only picks the order. You also need: if t1 -> t2 is chosen,
then end[t1] + gap <= start[t2]. This is a reified constraint:
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])
gap is usually changeover_time (see Changeover) or
0.
Multi-machine
With multiple machines, build one circuit per machine and gate absent tasks with self-loops:
for m in machines:
arcs = []
for t in tasks:
arcs.append([t, t, ~presence[m, t]])
for t2 in tasks:
if t != t2:
arcs.append([t, t2, seq[m, t, t2]])
model.add_circuit(arcs)
add_exactly_one(presence[m, t] for m in machines) ensures each task ends up on
exactly one machine.
Examples: example_01_simple_sequence.py (single machine),
example_03_seq_multi_stations.py (multi-machine).
Changeover
A changeover is the time needed to switch a machine from producing one product to another. There are three common ways to model it.
1. In the objective
Charge a cost for each seq[t1, t2] whose products differ. The cost does not
appear in the schedule itself, only the cost minimised.
total_co = sum(seq[t1, t2] * changeover_cost[t1, t2] for ...)
model.minimize(make_span + total_co)
Simple, but time and cost are decoupled: the schedule may not leave physical room for the changeover.
Example: example_01_simple_sequence.py.
2. In the precedence constraint
Include the changeover in the gap between tasks:
gap = changeover_time if products_differ(t1, t2) else 0
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])
Now time and cost agree: a changeover actually pushes the next task later.
Example: example_04_seq_with_changeover_in_constraint.py.
3. As a first-class event
Create an optional interval for every (t1, t2) that represents the
changeover itself. When t1 -> t2 is chosen, the interval is present, sits
between the two tasks, and has the right duration.
co_iv = model.new_optional_interval_var(co_start, co_duration, co_end, co_present, ...)
model.add(end[t1] <= co_start).only_enforce_if(seq[t1, t2])
model.add(co_end <= start[t2]).only_enforce_if(seq[t1, t2])
model.add(co_present == 1).only_enforce_if(seq[t1, t2])
This lets you add the changeover interval to add_cumulative (it consumes
operator time) or apply cleaning-resource constraints to it.
Example: example_08_changeover_as_event.py.
Starting product
A machine usually begins with some product already loaded. Model it with a dummy task 0 whose "product" is the starting product; the cost from dummy to the first real task is zero if they match, else the usual changeover.
Example: example_02_seq_lock_starting_product.py.
Breaks
A break is a time window during which a machine or operator is unavailable. Three techniques cover most cases.
1. Break as a fixed interval in add_cumulative
For each break, build a new_fixed_size_interval_var and add it alongside task
intervals with the full demand. Tasks are pushed around the break.
break_intervals = [
model.new_fixed_size_interval_var(start=s, size=e - s, name="break")
for (s, e) in breaks
]
all_intervals = task_intervals + break_intervals
demands = [1] * len(task_intervals) + [1] * len(break_intervals)
model.add_cumulative(all_intervals, demands, capacity=1)
Example: example_07_break_without_changeover.py.
2. Task duration stretched by overlapping breaks
When a task may run through a break and the break simply extends its total
time on the machine, use per-time-slot booleans that indicate whether the
task uses slot i, then add is_break[i] for each covered slot.
uses[t, i] = starts_before_i AND ends_after_i
duration[t] = base + sum(is_break[i] * uses[t, i] for i)
interval[t] = new_interval_var(start, duration, end, ...)
Example: example_14_task_delaying_break.py.
3. Break-aware start domains
If the break pattern is periodic, restrict task starts to the valid slots
with add_linear_expression_in_domain. Much faster than per-slot booleans.
domain_no_break = cp_model.Domain.from_values([...])
model.add_linear_expression_in_domain(start[t], domain_no_break)
Example: example_29_linear_domain_for_breaks.py,
example_33_conditional_duration_linear_domain.py.
Automatic jobs
Some "automatic" tasks don't consume the operator while running (think: a machine runs itself after a short manual setup). Model only the setup portion inside the cumulative, using a 1-unit interval at the task's start.
Example: example_12_an_automatic_job.py, example_13_automatic_jobs.py.
Shifts
A shift is a working window. Tasks must fit inside one shift (or, depending on policy, be split / disallowed from crossing shifts).
Synthetic shift breaks
Insert a tiny "fake break" interval at each shift boundary and forbid any
task from overlapping it with add_no_overlap. This prevents shift-crossing
without enumerating shift assignments.
for (s, e) in synthetic_shift_breaks:
br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
for t in tasks:
model.add_no_overlap([task_interval[t], br])
Example: example_16_shift_crossing_fake_time_unit.py.
Explicit shift assignment
Alternatively, give every task a one-hot presence[shift, task] and enforce
the shift window when present:
for t in tasks:
model.add_exactly_one(presence[s, t] for s in shifts)
for s in shifts:
model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
model.add(end[t] <= shift_end[s] ).only_enforce_if(presence[s, t])
More variables, but the assignment is explicit and easy to extend (e.g. to per-shift capacity).
Example: example_17_shift_crossing_mathieu.py.
Resources and cumulative
add_no_overlap says "at most one interval at a time". add_cumulative is the
generalisation: each interval consumes some amount of a shared resource, and
the total consumption must not exceed a capacity.
No-overlap
model.add_no_overlap(intervals)
Used per machine (one task at a time) and per stage (one job at a time in a flow-shop style).
Cumulative
model.add_cumulative(intervals, demands, capacity)
demands[i] is the amount of resource taken by intervals[i] while it runs.
Typical uses:
- Shared operator across machines. If two machines need the same operator,
cumulative over all their task intervals with demand
1and capacity1forbids parallel runs. Example:example_06_seq_with_intervals_resource.py. - Breaks. Treat a break as an interval that fully occupies the resource.
Example:
example_07_break_without_changeover.py. - Automatic jobs. Only the setup portion consumes the operator, modeled as a size-1 interval at each task's start.
Resource modes
Some tasks can run in different modes with different durations and headcounts. Encode the choice with a one-hot bool per mode and derive the actual processing time from it:
for t in tasks:
model.add_exactly_one([mode[t, k] for k in modes])
model.add(
proc_time[t] == sum(processing_time[product[t], k] * mode[t, k] for k in modes)
)
Example: example_10_people_mode.py.
Headcount tracking
If the per-task resource depends on whether the task overlaps a break (or
some other condition), plain add_cumulative may be insufficient. Build an
explicit per-timestep resource variable and link it to task-start presence
booleans. Three methods are compared in
example_34_headcount_tracking.py.
Multi-stage jobs
A job can consist of several stages that must run in order. Each stage is a task with its own start/end; the job start is the earliest task start and the job end is the latest task end.
Job - stage - task structure
tasks = {(job, stage) for job in jobs for stage in stages}
for job in jobs:
model.add_min_equality(job_start[job], [start[job, s] for s in stages])
model.add_max_equality(job_end[job], [end [job, s] for s in stages])
# stage precedence
for s in sorted(stages)[:-1]:
model.add(end[job, s] <= start[job, s + 1])
Example: example_21_stages_one_job.py.
Stage-level no-overlap
If each stage has a single shared machine, forbid two jobs from sitting on the same stage simultaneously:
for s in stages:
model.add_no_overlap([intervals[job, s] for job in jobs])
Examples: example_22_stages_two_jobs.py,
example_23_multistage_two_jobs_co.py.
Campaigning
A campaign is a run of same-product tasks on a machine between two changeovers. Typical rules:
- tasks within a campaign are the same product and pay no changeover cost,
- a campaign has a maximum size (e.g. at most
Ntasks), - switching products or hitting the cap triggers a changeover.
Approach 1: campaigns as entities
Create a set of potential campaigns, each with start/end/duration/presence,
and variables linking tasks to campaigns. Sequence campaigns (not tasks)
using add_circuit. The campaign-level changeover cost sits in the gap
between campaigns.
Pros: close to the business view. Cons: more variables, scales worse.
Example: example_09_max_number_of_continuous_tasks.py.
Approach 2: cumulative rank per task
Keep tasks as the atomic unit and attach a rank variable
cumul[t] in [0, campaign_size - 1]. On each t1 -> t2 arc:
- if the campaign continues,
cumul[t2] = cumul[t1] + 1, - if a changeover happens,
cumul[t2] = 0andend[t1] + changeover <= start[t2].
A reach_max[t] boolean fires when cumul[t] == campaign_size - 1, forcing a
reset and changeover. add_max_equality(max_value, [0, cumul[t1] + 1 - reach_end[t1] * campaign_size]) is a useful trick to compute the next rank
under an only_enforce_if.
Pros: fewer variables, scales better. Cons: trickier to explain.
Examples: example_24_campaigning_with_cumul.py (base),
example_27_campaigning_products.py (multi-product),
example_28_campaigning_products_machines.py (multi-machine).
Locking the task order
When tasks have deadlines that align with their index, locking
start[t-1] <= start[t] (or the stricter end[t-1] <= start[t]) is a cheap
heuristic that often gives a 10x+ solve-time improvement. See
example_25_campaigning_with_locked_seq.py and the two
example_26_campaigning_locked_seq_improved*.py variants.
Flexible campaign ends
If the model should be free to end a campaign early (not just at the cap),
drop the "force reach_end when cumul hits max" implication and let the
solver choose. This usually gives better objective values at a small solve-
time cost. See the two example_26_*_improved*.py files for the comparison.
Solver techniques
Beyond modeling, CP-SAT exposes a few knobs that help on hard instances.
Decision strategies
Tell the solver which variables to branch on first, and which value to try first. Often needed when a symmetric model returns a "correct but ugly" schedule.
model.add_decision_strategy(
starts.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE,
)
Example: example_07_break_without_changeover.py applies two strategies
(one on starts, one on sequence literals) to get a canonical output.
Parallel workers
solver.parameters.num_search_workers = 8
Uses N worker threads. Examples example_03_seq_scale*.py benchmark the
resulting speedup.
Warm-starting with hints
You can seed the search with values for any subset of variables:
model.proto().solution_hint.vars.append(var_index)
model.proto().solution_hint.values.append(value)
Or clear and re-set them between solves:
model.clear_hints()
add_hints(model, previous_solution)
This is the basis of phase solving: build the full model once, then run it
repeatedly with an increasing max_time, feeding each phase's solution in
as hints to the next. Example: example_32_solving_by_phases.py.
Reading back values
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
print(solver.value(var))
print(solver.objective_value())
MODEL_INVALID almost always means a constraint references a variable that
was never bound to the right model instance, or an only_enforce_if was
attached to something that is not a literal.
Examples overview
Each example in this section corresponds to one Python file under
scheduling/. Chapters are kept short: a brief description, the concepts it
demonstrates (linked back to the Concepts
section), and the source file inlined at the bottom.
Examples are grouped by topic in the sidebar:
- Basics - CP-SAT primitives and small modeling tricks.
- Sequencing - ordering tasks on one or more machines.
- Changeover and intervals - different ways to model switches between products and the move from manual durations to interval variables.
- Breaks - unavailable time windows, including breaks that extend a task's duration and automatic jobs that only need an operator for setup.
- Shifts - preventing tasks from crossing shift boundaries.
- Multi-stage jobs - jobs with ordered stages and per-stage capacity.
- Resources - flexible resource/headcount modes and time-varying demand tracking.
- Campaigning - grouping same-product tasks between changeovers, with multiple modelling approaches.
- Solver techniques - warm-starting CP-SAT across phases with hints.
A few files (example_11, example_18, example_19, example_30) are
empty placeholders kept for numbering; their chapters only note what they
would have covered.
Unit tests
Source: scheduling/example_00_unit_tests.py
What it does
A scratchpad of CP-SAT primitives. Not a scheduling model. It runs through small self-contained models that each exercise one feature:
add_bool_or,add_bool_and,add_bool_xorover two booleans.- Plain linear constraints with
Minimize. - Reifying "x is between 5 and 10" with chained
only_enforce_if, withadd_multiplication_equality, and withadd_linear_expression_in_domain. - Reading back results with
solver.Value.
Concepts
Notes
Useful as a cheat sheet when you want to remember how to express, for example, "b = (5 <= x <= 10)". Several of the snippets are commented out alternatives, kept for comparison.
Source
from ortools.sat.python import cp_model
model = cp_model.CpModel()
def get(x):
return solver.value(x)
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_or(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_and(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_xor(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 2)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 1)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 0)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))
#
model = cp_model.CpModel()
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x==1).only_enforce_if(~x)
#model.add(x==0).only_enforce_if(~x)
# model.add(x==1).only_enforce_if(x)
model.add(x==0).only_enforce_if(x)
#model.add(y==1).only_enforce_if(x)
model.minimize(x)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x))
#
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x_is_no_less_than_5 = model.new_bool_var('x_is_no_less_than_5')
x_is_no_more_than_10 = model.new_bool_var('x_is_no_more_than_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_no_less_than_5 == x >= 5)
# model.add(x_is_no_less_than_5 == 1).only_enforce_if(x>=5)
# model.add(x_is_no_more_than_10 == 1).only_enforce_if(x <= 10)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))
##########################################
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_greater_than_5 = model.new_bool_var('x_is_greater_than_5')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x >= 5).only_enforce_if(x_is_greater_than_5)
model.add(x < 5).only_enforce_if(~x_is_greater_than_5)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_greater_than_5))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
# This gives invalid
model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
#model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 5).only_enforce_if(~x_is_between_5_and_10)
model.add(x >10).only_enforce_if(~x_is_between_5_and_10)
model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')
model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)
model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)
model.add(x_is_between_5_and_10==x_greater_than_5*x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, )
model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x = model.new_int_var(0, 100, 'x')
model.add_linear_constraint(x, 5, 10).only_enforce_if(x_is_between_5_and_10)
model.add_linear_expression_in_domain(
x,
cp_model.Domain.from_intervals([[0, 4], [11, 100]])
).only_enforce_if(~x_is_between_5_and_10)
model.add(x == 3)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')
model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)
model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, [x_greater_than_5, x_less_than_10])
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))
Events overlapping
Source: scheduling/example_15_events_overlapping.py
What it does
Tutorial on detecting whether two intervals overlap, and by how much.
var_start_earlier_than_start[t1, t2]= "t1 starts before t2".var_end_later_than_start[t1, t2]= "t1 ends after t2 starts".var_overlap[t1, t2]is the product of the two, i.e. the AND.var_overlap_duration[t1, t2]equalsend[t1] - start[t2]when there is overlap, else zero.
Both tasks here have fixed start/end for illustration; the interest is in how the overlap indicator is built.
Concepts
- CP-SAT basics (reification,
add_multiplication_equality)
Source
from ortools.sat.python import cp_model
model = cp_model.CpModel()
max_time = 10
tasks = {1, 2}
# 1. Data
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
# overlap
# model.add(var_task_starts[1] == 1)
# model.add(var_task_ends[1] == 5)
# model.add(var_task_starts[2] == 3)
# model.add(var_task_ends[2] == 7)
# No overlap
model.add(var_task_starts[1] == 1)
model.add(var_task_ends[1] == 3)
model.add(var_task_starts[2] == 5)
model.add(var_task_ends[2] == 7)
var_overlap = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_overlap_duration = {
(t1, t2): model.new_int_var(0, max_time, '')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_start_earlier_than_start = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
var_end_later_than_start = {
(t1, t2): model.new_bool_var('')
for t1 in tasks
for t2 in tasks
if t1 != t2
}
for t1 in tasks:
for t2 in tasks:
if t1 == t2:
continue
model.add(var_task_starts[t1] <= var_task_starts[t2]).only_enforce_if(var_start_earlier_than_start[t1, t2])
model.add(var_task_starts[t1] > var_task_starts[t2]).only_enforce_if(~var_start_earlier_than_start[t1, t2])
model.add(var_task_ends[t1] > var_task_starts[t2]).only_enforce_if(var_end_later_than_start[t1, t2])
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(~var_end_later_than_start[t1, t2])
model.add_multiplication_equality(
var_overlap[t1, t2],
[var_start_earlier_than_start[t1, t2], var_end_later_than_start[t1, t2]]
)
model.add(var_overlap_duration[t1, t2] == var_task_ends[t1] - var_task_starts[t2]).only_enforce_if(var_overlap[t1, t2])
model.add(var_overlap_duration[t1, t2] == 0).only_enforce_if(~var_overlap[t1, t2])
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
count = 0
for t1 in tasks:
for t2 in tasks:
if t1 == t2:
continue
if solver.value(var_overlap[t1,t2]):
count = count + 1
print(f'Task{t1} starts earlier and overlap Task{t2} for a duration of '
f'{solver.value(var_overlap_duration[t1, t2])} units')
if count == 0:
print("No overlapped tasks at all")
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Simple sequence
Source: scheduling/example_01_simple_sequence.py
What it does
The smallest end-to-end scheduling model in the book. Three tasks across two products on a single machine.
- A dummy task
0acts as the start/end node. seq[t1, t2]booleans select the order viaadd_circuit.- If
t1 -> t2is chosen,end[t1] <= start[t2]is enforced. - The changeover cost lives in the objective:
Minimize(sum(seq * cost)). - Duration is encoded manually with
end - start == duration.
Concepts
- Circuit and sequencing
- Changeover (approach 1: cost in objective)
Source
# circuit arcs
# tuples
# add arc : arcs.append([job1_index, job2_index, binary]) |
# add link: Use only_enforce_if for binary <---> job1.start_time <= job2.end_time | -> three things connected
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
M = 99999
# 1. Data
'''
task product
1 A
2 B
3 A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
m = {
(t1, t2)
for t1 in tasks
for t2 in tasks
if t1 != t2
}
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or task_to_product[t1] == 'dummy' or task_to_product[t2] == 'dummy'
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
# 2. Decision variables
'''
(1, 2): 0/1
(1, 3)
(2, 1)
(2, 3)
(3, 1)
(3, 2)
'''
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_sequence = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for (t1, t2) in m
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)
model.minimize(total_changeover_time)
#model.maximize(total_changeover_time)
# 4. Constraints
# Add duration
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
arcs = list()
'''
arcs.append([0, 1, model.new_bool_var("dummy0" + "_to_1")])
arcs.append([0, 2, model.new_bool_var("dummy0" + "_to_2")])
arcs.append([0, 3, model.new_bool_var("dummy0" + "_to_3")])
arcs.append([1, 0, model.new_bool_var("1_to_" + "dummy0")])
arcs.append([2, 0, model.new_bool_var("2_to_" + "dummy0")])
arcs.append([3, 0, model.new_bool_var("3_to_" + "dummy0")])
'''
for (from_task, to_task) in m:
arcs.append(
[
from_task,
to_task,
variables_sequence[(from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_sequence[(from_task, to_task)])
model.add_circuit(arcs)
#model.add(variables_task_starts[0] == 8)
# Solve
# https://github.com/d-krupke/cpsat-primer
#model.add_decision_strategy(variables_task_starts, cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
for (t1, t2) in m:
print(f'{t1} --> {t2}: {solver.value(variables_sequence[(t1, t2)])}')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Sequence with locked starting product
Source: scheduling/example_02_seq_lock_starting_product.py
What it does
Same single-machine model as 01, but the dummy task is given a fixed "starting product". The changeover table is adjusted so that the arc from dummy to a task costs zero only when that task's product matches the starting product.
m_cost = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2]
or (task_to_product[t1] == 'dummy'
and task_to_product[t2] == starting_product)
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
Concepts
- Changeover (starting product)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
M = 99999
# 1. Data
'''
task product
1 A
2 B
3 A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
starting_product = 'A'
m = {
(t1, t2)
for t1 in tasks
for t2 in tasks
if t1 != t2
}
# A -> A, B --> B: 0
# dummy A -> A: 0
# dummy A -> B: 1
# A -> B, B -> A: 1
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == starting_product
)
else changeover_time[task_to_product[t2]]
for (t1, t2) in m
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_sequence = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for (t1, t2) in m
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)
model.minimize(total_changeover_time)
# 4. Constraints
# Add duration
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
arcs = list()
for (from_task, to_task) in m:
arcs.append(
[
from_task,
to_task,
variables_sequence[(from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_sequence[(from_task, to_task)])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
for (t1, t2) in m:
print(f'{t1} --> {t2}: {solver.value(variables_sequence[(t1, t2)])}')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Multi-station sequence
Source: scheduling/example_03_seq_multi_stations.py
What it does
Extends the single-machine model of 02 to multiple machines.
presence[m, t]booleans, withadd_exactly_oneper task, select which machine runs each task.- Task-level
start[t],end[t]are linked tostart[m, t],end[m, t]underonly_enforce_if(presence[m, t]). - One
add_circuitper machine, with a self-loop[t, t, ~presence[m, t]]to skip tasks assigned elsewhere. - The objective is
make_span + total_changeover_time, withadd_max_equalitycomputing the makespan.
Concepts
- Circuit and sequencing (multi-machine)
- Changeover (cost in objective)
Source
"""
About changeover:
- changeover cost is independently in objective function
- The time index of tasks are not reflecting changeover events
function s is to see a sorted dict indexed by tuples
task 0 is only used in arcs
duration constraint is by end - start = duration
In Matthieu approach, no such constraint but there is new_optional_interval_var constraints that did this ?
We need task level start and end for convenience of working with more complicated constraints
"""
from ortools.sat.python import cp_model
# Initiate
M = 99999
model = cp_model.CpModel()
def s(x):
sorted_keys = sorted(x)
for key in sorted_keys:
print(f"{key}, {x[key]}")
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)
# 4. Constraints
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# The changeover consideration is done here by Mattheiu's approach
# This can be replaced by inverval variable ?
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
for machine in machines:
arcs = list()
tmp = [x for x in X if x[0] == machine]
for (m, from_task, to_task) in tmp:
arcs.append(
[
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(machine, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print(solver.value(make_span))
for (m, t1, t2) in sorted(X):
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1:
print(f'Machine {m}: {t1} --> {t2} ')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Multi-station scale benchmark
Source: scheduling/example_03_seq_scale.py
What it does
Benchmark harness of the multi-machine model from 03a. The model is
wrapped in a model(num_tasks) function and solved for num_tasks in
[2, 3, ..., 12], recording wall-clock solve time and plotting with
matplotlib.
- Uses
solver.parameters.num_search_workers = 8. - Still uses the manual
end - start == durationconstraint (no intervals yet). - Objective is
make_spanonly; changeover line is commented out.
Concepts
- Solver techniques (parallel workers)
- Sets the baseline that 03c accelerates.
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
def generate_data(num_tasks):
tasks = {i+1 for i in range(num_tasks)}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy'}
task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
return tasks, tasks_0, task_to_product
def model(num_tasks):
model = cp_model.CpModel()
# 1. Data
tasks, tasks_0, task_to_product = generate_data(num_tasks)
max_time = num_tasks
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)# + total_changeover_time)
# 4. Constraints
for task in tasks:
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
for task in tasks:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# add_circuits
for machine in machines:
arcs = list()
tmp = [x for x in X if x[0] == machine]
for (m, from_task, to_task) in tmp:
arcs.append(
[
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
]
)
if from_task != 0 and to_task != 0:
model.add(
variables_task_ends[from_task] <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(machine, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
solver.parameters.num_search_workers = 8
start = time()
status = solver.solve(model=model)
total_time = time() - start
return total_time
if __name__ == '__main__':
num_tasks = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
seconds = []
for i in num_tasks:
print(i)
processing_time = model(i)
seconds.append(processing_time)
plt.plot(num_tasks, seconds)
plt.show()
Multi-station scale (intervals)
Source: scheduling/example_03_seq_scale_Mathieu.py
What it does
Same benchmark as 03b, but switches to new_optional_interval_var plus
add_no_overlap per machine. A helper add_circuit_constraints(...)
factors out the per-machine circuit.
The interval-based model scales much better: the main loop here runs
num_tasks up to ~80 on the same hardware that 03b handled only up to 12.
Concepts
- Interval variables (optional intervals)
- Circuit and sequencing (factored helper)
- Solver techniques (parallel workers)
Source
# USE INTERVAL !!!
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
def generate_data(num_tasks):
tasks = {i+1 for i in range(num_tasks)}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy'}
task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
return tasks, tasks_0, task_to_product
def model(num_tasks):
model = cp_model.CpModel()
# 1. Data
tasks, tasks_0, task_to_product = generate_data(num_tasks)
max_time = num_tasks
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_intervals = {
(m, t): model.new_optional_interval_var(
start=variables_machine_task_starts[m, t],
size=processing_time[task_to_product[t]],
end=variables_machine_task_ends[m, t],
is_present=variables_machine_task_presences[m, t],
name=f"interval_{t}_{m}",
)
for t in tasks
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
total_changeover_time = sum(
[variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)
# 4. Constraints
for task in tasks:
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
for machine in machines:
tmp = {(m, t) for (m, t) in variables_intervals if m == machine}
intervals = [variables_intervals[x] for x in tmp]
model.add_no_overlap(intervals)
# Create circuit constraints
add_circuit_constraints(
model,
machines,
tasks,
variables_task_starts,
variables_task_ends,
variables_machine_task_presences,
)
# Solve
solver = cp_model.CpSolver()
solver.parameters.num_search_workers = 8
start = time()
status = solver.solve(model=model)
total_time = time() - start
return total_time
def add_circuit_constraints(
model,
machines,
tasks,
variables_task_starts,
variables_task_ends,
variables_machine_task_presences,
):
for machine in machines:
arcs = (
[]
) # List of all feasible arcs within a machine. Arcs are boolean to specify circuit from node to node
machine_tasks = tasks
for node_1, task_1 in enumerate(machine_tasks):
mt_1 = str(task_1) + "_" + str(machine)
# Initial arc from the dummy node (0) to a task.
arcs.append(
[0, node_1 + 1, model.new_bool_var("first" + "_" + mt_1)]
) # if mt_1 follows dummy node 0
# Final arc from an arc to the dummy node (0).
arcs.append(
[node_1 + 1, 0, model.new_bool_var("last" + "_" + mt_1)]
) # if dummy node 0 follows mt_1
# For optional task on machine (i.e other machine choice)
# Self-looping arc on the node that corresponds to this arc.
arcs.append(
[
node_1 + 1,
node_1 + 1,
~variables_machine_task_presences[(machine, task_1)],
]
)
for node_2, task_2 in enumerate(machine_tasks):
if node_1 == node_2:
continue
mt_2 = str(task_2) + "_" + str(machine)
# Add sequential boolean constraint: mt_2 follows mt_1
mt2_after_mt1 = model.new_bool_var(f"{mt_2} follows {mt_1}")
arcs.append([node_1 + 1, node_2 + 1, mt2_after_mt1])
# We add the reified precedence to link the literal with the
# times of the two tasks.
min_distance = 0
(
model.add(
variables_task_starts[task_2] >= variables_task_ends[task_1] + min_distance
).only_enforce_if(mt2_after_mt1)
)
model.add_circuit(arcs)
if __name__ == '__main__':
num_tasks = [x+2 for x in range(80)]
seconds = []
for i in num_tasks:
print(i)
processing_time = model(i)
seconds.append(processing_time)
plt.plot(num_tasks, seconds)
plt.show()
Changeover in constraint
Source: scheduling/example_04_seq_with_changeover_in_constraint.py
What it does
Moves the changeover out of the objective and into the precedence
constraint. If seq[m, t1, t2] is true, then
model.add(end[t1] + distance <= start[t2]).only_enforce_if(seq[m, t1, t2])
where distance is the changeover time between the two products. The
objective becomes just the makespan.
Now schedule and objective agree: a changeover physically pushes the next task later, it is not only charged abstractly.
Concepts
- Changeover (approach 2: in constraint)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
'''
M1 A -> A -> A 1+1
M2 A -> B -> B 1+1+1 --> 3
'''
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
for task in tasks_0:
model.add(
variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
)
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Sequence with intervals
Source: scheduling/example_05_seq_with_intervals.py
What it does
Same setup as 04, but replaces the manual duration constraint
end - start == duration with new_optional_interval_var:
variables_machine_task_intervals[m, t] = model.new_optional_interval_var(
start[m, t], processing_time[product_of(t)], end[m, t],
presence[m, t], name=f"interval_{m}_{t}",
)
The changeover is still encoded as end[t1] + distance <= start[t2]
under only_enforce_if(seq[m, t1, t2]).
Concepts
- Interval variables (first introduction)
- Changeover
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Sequence with shared resource
Source: scheduling/example_06_seq_with_intervals_resource.py
What it does
Extends 05 with a globally shared resource (a single operator). After
building all machine-task intervals, the model adds a single
add_cumulative over them:
intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1] * len(intervals), 1)
With capacity 1, the two machines can no longer run tasks in parallel:
the operator has to pick one at a time.
Concepts
- Resources and cumulative (shared operator)
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'B'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
# This includes task 0
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
# One task to one machine. Link across level.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Add resource constraint that there is only one people so no parallel task expected
intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Changeover as event
Source: scheduling/example_08_changeover_as_event.py
What it does
Promotes the changeover to a first-class scheduled event. For every
ordered pair (t1, t2) there is an optional interval with its own start,
end, and presence:
co_iv[m, t1, t2] = model.new_optional_interval_var(
co_start[m, t1, t2],
changeover_time[product_of(t2)],
co_end[m, t1, t2],
co_present[m, t1, t2],
...,
)
When seq[m, t1, t2] is chosen, the model forces the co interval to be
present, sit between the two tasks, and have the right size:
model.add(end[t1] <= co_start[t1, t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end[t1, t2] <= start[t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end - co_start == distance).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 1).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 0).only_enforce_if(~seq[m, t1, t2])
Because the changeover is now an interval, it can take part in cumulative constraints (cleaning resource, operator availability, etc.).
Concepts
- Changeover (approach 3: as event)
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 B
'''
# 1. Data
tasks = {1, 2}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 2, 'B': 2}
machines = {0}
machines_starting_products = {0: 'A'}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# This is not yet used
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
# ! This can replace the end - start = duration constrain
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"t_interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# Add change over-related variables !!!
variables_co_starts = {
(t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_co_ends = {
(t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_machine_co_starts = {
(m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_start")
for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_ends = {
(m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_end")
for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_presences = {
(m, t1, t2): model.new_bool_var(f"co_presence_m{m}_t{t1}_t{t2}")
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
variables_machine_co_intervals = {
(m, t1, t2): model.new_optional_interval_var(
variables_machine_co_starts[m, t1, t2],
changeover_time[task_to_product[t2]],
variables_machine_co_ends[m, t1, t2],
variables_machine_co_presences[m, t1, t2],
name=f"co_interval_m{m}_t{t1}_t{t2}"
)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# One task to one machine.
for task in tasks:
task_candidate_machines = machines
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# co level link to machine-co level
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
for m in machines:
model.add(
variables_co_starts[t1, t2] == variables_machine_co_starts[m, t1, t2]
).only_enforce_if(variables_machine_co_presences[m, t1, t2])
model.add(
variables_co_ends[t1, t2] == variables_machine_co_ends[m, t1, t2]
).only_enforce_if(variables_machine_co_presences[m, t1, t2])
# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for t1 in tasks_0:
for t2 in tasks_0:
# arcs
if t1 != t2:
arcs.append([
t1,
t2,
variables_machine_task_sequence[(m, t1, t2)]
])
distance = m_cost[m, t1, t2]
# cannot require the time index of task 0 to represent the first and the last position
if t2 != 0:
# to schedule tasks and c/o
model.add(
variables_task_ends[t1] <= variables_co_starts[t1, t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_co_ends[t1, t2] <= variables_task_starts[t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
# ensure intervals are consistent so we can apply resource constraints later
model.add(
variables_machine_co_presences[m, t1, t2] == 1
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add(
variables_machine_co_presences[m, t1, t2] == 0
).only_enforce_if(~variables_machine_task_sequence[(m, t1, t2)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
print('variables_machine_task_sequence[t1, t2]', solver.value(variables_machine_task_sequence[m, t1, t2]))
print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
print('variables_machine_co_presences[m, t1, t2]', solver.value(variables_machine_co_presences[m, t1, t2]))
print('variables_machine_co_starts[m, t1, t2]', solver.value(variables_machine_co_starts[m, t1, t2]))
print('variables_machine_co_ends[m, t1, t2]', solver.value(variables_machine_co_ends[m, t1, t2]))
#print('variables_machine_co_intervals[m, t1, t2]', variables_machine_co_intervals[m, t1, t2])
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Break without changeover
Source: scheduling/example_07_break_without_changeover.py
What it does
Single machine, four same-product tasks, and a fixed break at (2, 3).
- The break is a
new_fixed_size_interval_varadded to anadd_cumulative( intervals, capacity=1)alongside task intervals, so tasks are pushed around it. - Since all tasks share one product, no changeover logic is needed.
- Two
add_decision_strategycalls (on starts and on sequence literals) are used to force the solver to produce the canonical0 1 2 3 4 0order instead of a symmetric alternative.
Concepts
- Breaks (approach 1: fixed interval in cumulative)
- Solver techniques (decision strategies)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
3 A
4 A
'''
# 1. Data
tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'A', 3: 'A', 4: 'A'}
processing_time = {'dummy': 0, 'A': 1}
changeover_time = {'dummy': 0, 'A': 1}
machines = {0}
machines_starting_products = {0: 'A'}
breaks = {(2, 3)}
X = {
(m, t1, t2)
for t1 in tasks_0
for t2 in tasks_0
for m in machines
if t1 != t2
}
# Now this used in constraints, not in objective function anymore
m_cost = {
(m, t1, t2): 0
if task_to_product[t1] == task_to_product[t2] or (
task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
)
else changeover_time[task_to_product[t2]]
for (m, t1, t2) in X
}
# 2. Decision variables
max_time = 8
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks_0
for m in machines
}
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for (m, t1, t2) in X
}
# intervals
variables_machine_task_intervals = {
(m, task): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
processing_time[task_to_product[task]],
variables_machine_task_ends[m, task],
variables_machine_task_presences[m, task],
name=f"interval_{m}_{task}"
)
for task in tasks_0
for m in machines
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# One task to one machine.
for task in tasks:
# For this task
# get all allowed machines
task_candidate_machines = machines
# find the subset in presence matrix related to this task
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks_0:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
model.add(variables_machine_task_presences[m, 0] == 1)
# Sequence
for m in machines:
arcs = list()
for from_task in tasks_0:
for to_task in tasks_0:
# arcs
if from_task != to_task:
arcs.append([
from_task,
to_task,
variables_machine_task_sequence[(m, from_task, to_task)]
])
distance = m_cost[m, from_task, to_task]
# cannot require the time index of task 0 to represent the first and the last position
if to_task != 0:
model.add(
variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
for task in tasks:
arcs.append([
task, task, ~variables_machine_task_presences[(m, task)]
])
model.add_circuit(arcs)
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=1, name='a_break') for (start, end) in breaks
}
# Add resource control with break
intervals = list(variables_machine_task_intervals.values()) + list(variables_breaks.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)
# Solve
# https://github.com/d-krupke/cpsat-primer
# default
# 0 1 4 2 3 0
#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
# 0 4 3 2 1 0
#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0
# strange
#model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0
# Need the following both to have the right sequence
model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 2 3 4 0
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('Make-span:', solver.value(make_span))
for m in machines:
print(f'------------\nMachine {m}')
print(f'Starting dummy product: {machines_starting_products[m]}')
for t1 in tasks_0:
for t2 in tasks_0:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
One automatic job
Source: scheduling/example_12_an_automatic_job.py
What it does
Models an "automatic" task: one that consumes the operator only for its first time unit (setup), after which the machine runs on its own.
-
var_task_intervals[t]is the full task interval (setup + auto run). -
var_task_intervals_autojobs[t]is a size-1 interval at(start, start + 1), representing only the setup portion. -
Breaks are added as fixed intervals. The cumulative constraint uses the setup intervals and the breaks:
model.add_cumulative( intervals=setup_intervals + break_intervals, demands=[1] * (len(tasks) + len(breaks)), capacity=1, )
So breaks push task starts around but cannot preempt an already-running automatic task.
Concepts
- Breaks (automatic jobs)
- Resources and cumulative
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_3
'''
# 1. Data
tasks = {1}
products = {'A', 'B'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_3'}
processing_time = {'A': 3, 'B': 1}
max_time = 10
breaks = {(0, 1), (2, 10)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_autojobs = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_3'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_autojobs.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1] + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Automatic jobs
Source: scheduling/example_13_automatic_jobs.py
What it does
Same automatic-task idea as 12, now with two tasks that need to be sequenced.
- Every task has a full interval plus a size-1 "auto start" interval.
- A circuit with
seq[t1, t2]booleans orders the tasks; the selected arc enforcesend[t1] <= start[t2]. - The cumulative uses the auto-start intervals and the break intervals so that breaks only block task starts.
Concepts
- Breaks (automatic jobs)
- Circuit and sequencing
- Resources and cumulative
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_3
2 B TYPE_3
'''
# 1. Data
tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'B'}
task_to_type = {1: 'TYPE_3', 2: 'TYPE_3'}
processing_time = {'A': 4, 'B': 3}
max_time = 10
breaks = {(0, 1), (2, 3), (4, 6), (7, 10)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_auto = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_auto_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_3'
}
var_task_seq = {
(t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
for t1 in tasks
for t2 in tasks
if t1 != t2
}
arcs = []
for t1 in tasks:
tmp_1 = model.new_bool_var(f'first_to_{t1}')
arcs.append([0, t1, tmp_1])
tmp_2 = model.new_bool_var(f'{t1}_to_last')
arcs.append([t1, 0, tmp_2])
for t2 in tasks:
if t1 == t2:
continue
tmp_3 = model.new_bool_var(f'{t1}_to_{t2}')
arcs.append([t1, t2, var_task_seq[t1, t2]])
model.add(
var_task_ends[t1] <= var_task_starts[t2]
).only_enforce_if(var_task_seq[t1, t2])
model.add_circuit(arcs)
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
print('======================= ALLOCATION & SEQUENCE =======================')
if True:
for t1 in tasks:
for t2 in tasks:
if t1 != t2:
value = solver.value(var_task_seq[(t1, t2)])
print(f'{t1} --> {t2} {value}')
# if value == 1 and t2 != 0:
# print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
# if value == 1 and t2 == 0:
# print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Task delaying a break
Source: scheduling/example_14_task_delaying_break.py
What it does
When a task runs across a break, its total machine time grows by the length of the break. This is encoded per time slot:
var_task_duration_timeslots[t, i]= 1 iff tasktoccupies sloti. Built from two reified booleans, "starts before i" AND "ends after i", combined withadd_multiplication_equality.var_task_new_duration[t] = base + sum(is_break[i] * uses[t, i] for i)computes the stretched duration.- The task interval uses this stretched duration directly.
var_task_intervals[t] = model.new_interval_var(
start[t], new_duration[t], end[t], ...,
)
Break intervals are added to add_cumulative alongside the task start
intervals (since breaks do not preempt an auto-type task once started).
Concepts
- Breaks (approach 2: duration stretched)
- CP-SAT basics (reification,
add_multiplication_equality)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_4
2 B TYPE_4
'''
# 1. Data
tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_new_duration = {
task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}
var_task_duration_timeslots = {
(task, i): model.new_bool_var(f'task {task} uses interval {i}')
for task in tasks
for i in range(max_time)
}
## four
# AND pair
var_task_start_earlier_than_start = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
var_task_end_later_than_end = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
for task in tasks:
for i in range(max_time):
model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])
model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])
# model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
# model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
#
# model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
# model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])
# And
model.add_multiplication_equality(
var_task_duration_timeslots[task, i],
[var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
)
# model.add_min_equality(
# var_task_duration_timeslots[task, i],
# [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
# )
# for task in tasks:
# for i in range(max_time):
# start_index = i
# end_index = i + 1
#
# # TRUE
# model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
# model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
# # FALSE
# if i == 0:
# # first time slot
# model.add_linear_expression_in_domain(
# var_task_starts[task],
# cp_model.Domain.from_intervals([[1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# elif i == max_time - 1:
# # last time slot
# model.add_linear_expression_in_domain(
# var_task_ends[task],
# cp_model.Domain.from_intervals([[0, max_time-1]])
# ).only_enforce_if(~var_task_timeslots[task, i])
# else:
# model.add_linear_expression_in_domain(
# x,
# cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# task_1_start < task_2_start < task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:
for task in tasks:
model.add(
var_task_new_duration[task] == processing_time[task_to_product[task]] +
sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
)
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_new_duration[task],
#processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_auto = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_auto_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_4'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'task {task}')
for i in range(max_time):
print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print(solver.value(var_task_new_duration[task]))
print('Make-span:', solver.value(make_span))
# print('======================= ALLOCATION & SEQUENCE =======================')
# if True:
# for t1 in tasks:
# for t2 in tasks:
# if t1 != t2:
# value = solver.value(var_task_seq[(t1, t2)])
# print(f'{t1} --> {t2} {value}')
# # if value == 1 and t2 != 0:
# # print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
# # if value == 1 and t2 == 0:
# # print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Linear domain for breaks
Source: scheduling/example_29_linear_domain_for_breaks.py
What it does
Compares two ways of enforcing that tasks never overlap a periodic break:
- Method 1 (
run_model_1): model every break as a fixed interval and put them inadd_cumulative(oradd_no_overlap) along with tasks. - Method 2: keep tasks as intervals but restrict their start values
to a domain that excludes break-overlapping starts, using
add_linear_expression_in_domain.
The second method skips creating many break intervals and lets CP-SAT propagate directly on the start domain, which can be faster for highly periodic schedules. The file includes a benchmark loop.
Concepts
- Breaks (approach 3: start-domain restriction)
- Solver techniques
Source
from ortools.sat.python import cp_model
from time import time
from tqdm import tqdm
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
# method 1: No overlapping
def run_model_1(number_of_tasks):
#number_of_tasks = 200
tasks = {x for x in range(number_of_tasks)}
# 2 tasks --> 8 time units
processing_time = 2
max_time = number_of_tasks * 4 + 100
breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}
valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_start_times = [item for sublist in valid_start_times for item in sublist]
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time,
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] < var_task_starts[task])
var_break_intervals = {
break_id: model.new_fixed_size_interval_var(
break_start, break_end-break_start, f"break_{break_id}"
) for break_id, (break_start, break_end) in enumerate(breaks)
}
all_intervals = [x for x in var_task_intervals.values()] + [x for x in var_break_intervals.values()]
model.add_no_overlap(all_intervals)
# obj
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# for task in tasks:
# print(f'Task {task} ',
# solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
# )
#
# print('Make-span:', solver.value(make_span))
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
# method 2: Use linear domain
def run_model_2(number_of_tasks):
#number_of_tasks = 4
tasks = {x for x in range(number_of_tasks)}
# 2 tasks --> 8 time units
processing_time = 2
max_time = number_of_tasks * 4 + 100
breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}
valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_periods = [[0 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
valid_start_times = [item for sublist in valid_start_times for item in sublist]
model = cp_model.CpModel()
#model.new_int_varFromDomain(cp_model.Domain.from_intervals([[1, 2], [4, 6]]), 'x')
var_task_starts = {task: model.new_int_varFromDomain(
cp_model.Domain.from_intervals(valid_periods),
f"task_{task}_start")
for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time,
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] < var_task_starts[task])
all_intervals = [x for x in var_task_intervals.values()]
model.add_no_overlap(all_intervals)
# obj
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# for task in tasks:
# print(f'Task {task} ',
# solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
# )
#
# print('Make-span:', solver.value(make_span))
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
N = 200
sizes_1 = list(range(2, N, 10))
sizes_2 = list(range(2, N, 10))
model_1_times = []
model_2_times = []
print("\nNoOverlap\n")
for size in tqdm(sizes_1):
model_1_times.append(run_model_1(size))
print("\nLinear Domain\n")
for size in tqdm(sizes_2):
model_2_times.append(run_model_2(size))
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(sizes_1, model_1_times, marker='o', label='NoOverLapping')
plt.plot(sizes_2, model_2_times, '-.', marker='o', label='Linear Domain')
plt.legend()
plt.title('Performance benchmarking')
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
Conditional duration via linear domain
Source: scheduling/example_33_conditional_duration_linear_domain.py
What it does
A task's duration depends on whether its start falls in a break slot.
- Two domains are built:
domain_break(start values that overlap a break) anddomain_no_break(safe start values). - A reified bool
var_task_overlap_break[t]toggles which domain the start must belong to, viaadd_linear_expression_in_domain. - Under
only_enforce_if, the task duration is eitherprocessing_timeorprocessing_time + 1. - The interval is built from start/duration/end, so everything downstream (no-overlap, cumulative) sees the correct effective duration.
Concepts
- Breaks (conditional duration)
- CP-SAT basics (
add_linear_expression_in_domain) - Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
break_offset = 1
num_of_tasks = 3
max_time = num_of_tasks*3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
# print("Heuristic 1: Apply the tasks sequence heuristics")
for task in tasks:
if task == 0:
continue
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
~var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time).only_enforce_if(
~var_task_overlap_break[task]
)
# intervals
var_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_durations[task],
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
model.add_no_overlap(var_intervals.values())
# Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span + sum(var_task_durations.values()))
# model.minimize(sum(var_task_durations))
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print_result = True
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_overlap_break[task]),
solver.value(var_task_durations[task]),
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Shift crossing (fake time unit)
Source: scheduling/example_16_shift_crossing_fake_time_unit.py
What it does
Prevents tasks from straddling shift boundaries by inserting a tiny fake break at each boundary. Every task is required to not overlap those fake breaks.
var_shift_break_intervals[s, e] = model.new_fixed_size_interval_var(
start=s, size=e - s, name="shift_edge",
)
for s, e in synthetic_shift_breaks:
for t in tasks:
model.add_no_overlap([var_task_intervals[t], var_shift_break_intervals[s, e]])
Real breaks are still modeled with add_cumulative as usual.
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
# 1. Data
synthetic_shift_breaks = {(4, 5), (9, 10)}
breaks = {(0, 2)}
tasks = {1}
processing_time = {1: 3}
max_time = 10
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task], processing_time[task], var_task_ends[task], name=f"interval_t{task}"
) for task in tasks
}
# Add break time
var_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# THE CONSTRAINT for synthetic shift break time unit
var_shift_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in synthetic_shift_breaks
}
for start, end in synthetic_shift_breaks:
print(start, end)
for task in tasks:
model.add_no_overlap([var_task_intervals[task], var_shift_break_intervals[start, end]])
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Shift crossing (Mathieu)
Source: scheduling/example_17_shift_crossing_mathieu.py
What it does
Alternative to 16. Instead of synthetic breaks, every task is assigned to exactly one shift, and its start/end are constrained to the shift window:
for t in tasks:
model.add_exactly_one(presence[s, t] for s in shifts)
for s in shifts:
model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
model.add(end[t] <= shift_end[s] ).only_enforce_if(presence[s, t])
The rest of the model (real breaks, cumulative, makespan) is the same as before.
Concepts
- Shifts (explicit shift assignment)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
tasks = {1}
processing_times = {1: 3}
max_time = 10
breaks = {(0, 2)}
shifts = {1, 2}
shift_starts = {1:0, 2:4}
shift_ends = {1:4, 2:8}
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_shift_task_presence = {
(shift, task): model.new_bool_var(f'task_{task}_is_in_shift_{shift}')
for shift in shifts for task in tasks
}
for task in tasks:
model.add(sum(var_shift_task_presence[shift, task] for shift in shifts) == 1 )
for shift in shifts:
model.add(var_task_starts[task] >= shift_starts[shift]).only_enforce_if(
var_shift_task_presence[shift, task]
)
model.add(var_task_ends[task] <= shift_ends[shift]).only_enforce_if(
var_shift_task_presence[shift, task]
)
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task], processing_times[task], var_task_ends[task], name=f"interval_t{task}"
) for task in tasks
}
# Add break time
var_break_intervals = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Stages, one job
Source: scheduling/example_21_stages_one_job.py
What it does
A single job with three stages. Each stage is a task indexed by
(job, stage).
var_job_starts/var_job_endsare the min/max of task starts/ends viaadd_min_equality/add_max_equality.- Stage precedence:
end[job, s] <= start[job, s + 1]. - Each stage has an interval, and no-overlap is enforced per stage (only one job at a time per stage).
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 10
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Stages, two jobs
Source: scheduling/example_22_stages_two_jobs.py
What it does
Extends 21 with a second job. The per-stage add_no_overlap now actually
does work: with two jobs, stage s can only run one of them at a time.
Structure is otherwise identical to 21 (min/max for job start/end, stage precedence, make-span minimisation).
Concepts
- Multi-stage jobs (stage-level no-overlap)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1, 2}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 20
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
for stage in stages:
model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Multistage, two jobs
Source: scheduling/example_23_multistage_two_jobs_co.py
What it does
Scales the multi-stage model of 22 up to 6 jobs over 3 stages. The model
is identical in shape: stage precedence, per-stage no-overlap,
add_max_equality for makespan, Minimize(make_span).
Despite the filename mentioning co (changeover), this version does not
actually add a changeover constraint; it is a scalability exercise.
Concepts
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
jobs = {1, 2, 3, 4, 5, 6}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}
processing_time = 3
max_time = 100
# 1. Jobs
var_job_starts = {
job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}
var_job_ends = {
job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}
# 2. Tasks
var_task_starts = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}
var_task_ends = {
(job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}
for job in jobs:
model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])
for stage in stages:
if stage == len(stages):
continue
model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])
var_task_intervals = {
(job, stage): model.new_interval_var(
var_task_starts[job, stage],
processing_time,
var_task_ends[job, stage],
name=f"interval_job_{job}_stage_{stage}"
)
for job in jobs
for stage in stages
}
for stage in stages:
model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for job in jobs:
print(f"job_{job} start: {solver.value(var_job_starts[job])} end:{solver.value(var_job_ends[job])}")
for stage in stages:
print(f'Stage {stage} ',
solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
People mode
Source: scheduling/example_10_people_mode.py
What it does
Each task can run in one of several "people modes", with different processing times and headcount requirements. Only the mode choice changes the model; sequencing is similar to 03.
-
variables_task_resource_mode[t, k]: one-hot bool for tasktselecting modek. Exactly one mode per task. -
variables_task_processing_time[t]is derived from the mode:model.add( processing_time_var[t] == sum( processing_time[product[t], k] * mode[t, k] for k in modes ) ) -
Standard machine-assignment variables (
presence[m, t],start[m, t],end[m, t]), per-machineadd_circuit, and task-level links complete the model.
Concepts
- Resources and cumulative (resource modes)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
'''
### 1. Data
tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'A'}
machines = {1,2}
resource_modes = {1, 2}
# product -> people mode -> time duration
processing_time = {
('A', 1): 3,
('A', 2): 2
}
resource_requirement = {
('A', 1): 2,
('A', 2): 3
}
max_time = 20
# (task, people_mode)
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
# One task to one machine.
for task in tasks:
task_candidate_machines = machines
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for sequence control
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for t1 in tasks
for t2 in tasks
for m in machines
if t1 != t2
}
# Sequence
for m in machines:
arcs = list()
for node1, t1 in enumerate(tasks):
tmp_1 = model.new_bool_var(f'm_{m}_first_to_{t1}')
arcs.append([0, t1, tmp_1])
tmp_2 = model.new_bool_var(f'm_{m}_{t1}_to_last')
arcs.append([t1, 0, tmp_2])
arcs.append([t1, t1, ~variables_machine_task_presences[m, t1]])
for node_2, t2 in enumerate(tasks):
# arcs
if t1 == t2:
continue
arcs.append([
t1,
t2,
variables_machine_task_sequence[(m, t1, t2)]
])
distance = 0
# cannot require the time index of task 0 to represent the first and the last position
model.add(
variables_task_ends[t1] + distance <= variables_task_starts[t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add_circuit(arcs)
#
variables_task_resource_mode = {
(task, resource_mode): model.new_bool_var(f"task {task} using resource mode {resource_mode}")
for task in tasks for resource_mode in resource_modes
}
# variables_task_resource_modes = {
# task: model.new_int_var(0, len(resource_modes), f"the resource mode of {task}")
# for task in tasks
# }
# one task has to pick one resource mode
for task in tasks:
tmp = sum(variables_task_resource_mode[task, resource_mode] for resource_mode in resource_modes)
model.add(tmp == 1)
# link resource id with decision matrix
# for task in tasks:
# for resource_mode in resource_modes:
# model.add(
# variables_task_resource_modes[task] == resource_mode
# ).only_enforce_if(
# variables_task_resource_mode_matrix[task, resource_mode]
# )
variables_task_processing_time = {
task: model.new_int_var(0, 99, f"processing time for {task}")
for task in tasks
}
for task in tasks:
model.add(
variables_task_processing_time[task] == sum(
processing_time[task_to_product[task], resource_mode]*variables_task_resource_mode[task, resource_mode]
for resource_mode in resource_modes
)
)
# model.new_optional_interval_var(1,1,2,1,'')
# model.new_optional_interval_var(
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# '')
# variables_machine_task_intervals = {
# (m, task): model.new_optional_interval_var(
# variables_machine_task_starts[m, task],
# #2,
# variables_task_processing_time[task],
# #processing_time[task_to_product[task], variables_task_resource_modes[task]],
# variables_machine_task_ends[m, task],
# variables_machine_task_presences[m, task],
# name=f"t_interval_{m}_{task}"
# )
# for task in tasks
# for m in machines
# }
variables_machine_task_resource_mode_presence = {
(m, task, resource_mode): model.new_bool_var(f'm{m}_task_{task}_resource_mode_{resource_mode}_presence')
for m in machines
for task in tasks
for resource_mode in resource_modes
}
for task in tasks:
for resource_mode in resource_modes:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for m in machines) == 1
).only_enforce_if(
variables_task_resource_mode[task, resource_mode]
)
for task in tasks:
for m in machines:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for resource_mode in resource_modes) == 1
).only_enforce_if(
variables_machine_task_presences[m, task]
)
for task in tasks:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode]
for resource_mode in resource_modes
for m in machines
) == 1
)
variables_machine_task_mode_intervals = {
(m, task, resource_mode): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
#2,
variables_task_processing_time[task],
#processing_time[task_to_product[task], variables_task_resource_modes[task]],
variables_machine_task_ends[m, task],
variables_machine_task_resource_mode_presence[m, task, resource_mode],
#variables_machine_task_presences[m, task]*variables_task_resource_mode[task, resource_mode],
name=f"int_m{m}_t{task}_mode{resource_mode}"
)
for task in tasks
for m in machines
for resource_mode in resource_modes
}
intervals = [x for x in variables_machine_task_mode_intervals.values()]
demands = [
resource_requirement[task_to_product[task], resource_mode]
for machine, task, resource_mode in variables_machine_task_mode_intervals.keys()
]
model.add_cumulative(intervals=intervals, demands=demands, capacity=7)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# for m in machines:
# for task in tasks:
# for resource_mode in resource_modes:
# print(f'{m} {task} {resource_mode}')
# print(solver.value(variables_machine_task_mode_intervals[m, task, resource_mode]))
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for m in machines:
for task in tasks:
for resource_mode in resource_modes:
print(f'M {m} T {task} R {resource_mode}' ,end = ' ')
print(f' {solver.value(variables_machine_task_resource_mode_presence[m, task, resource_mode])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task]), end=' '
)
for resource_mode in resource_modes:
value = solver.value(variables_task_resource_mode[task, resource_mode])
if value == 1:
print(f'Using resource mode {resource_mode}')
print('Make-span:', solver.value(make_span))
print('======================= ALLOCATION & SEQUENCE =======================')
for m in machines:
print(f'------------\nMachine {m}')
for t1 in tasks:
value = solver.value(variables_machine_task_presences[(m, t1)])
if value == 1:
print(f'task_{t1}_on machine_{m}')
for t2 in tasks:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
Resource mode (empty)
Source: scheduling/example_11_resource_mode.py
The file is empty. Placeholder kept for numbering. See People mode for a similar, fully-worked example.
Headcount tracking
Source: scheduling/example_34_headcount_tracking.py
What it does
Compares three ways of tracking resource (headcount) usage over time when task durations are state-dependent (can be 2 or 3 depending on whether the task overlaps a break).
- Method 0 - native cumulative. One
add_cumulativewith the actual task intervals. - Method 1 - cumulative with start time. Per-time-slot booleans
var_task_starts_presence[t, i]indicate task start. Per-duration "did a task starting within the lastdslots cover this slot?" booleans are combined withadd_max_equality. Per-task, per-time resource variables are then switched byvar_task_overlap_break[t]. - Method 2 - cumulative with overlap. Encodes the same "is this
slot inside the task?" using per-slot overlap booleans (
start <= tANDend > t).
The file also defines a Variables dataclass and an
extract_solution(...) helper that converts variable dicts into their
solved values, handling IntervalVar specially.
Concepts
- Resources and cumulative (time-varying demand)
- Breaks (state-dependent duration)
- Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Set
import copy
import logging
logger = logging.getLogger(__name__)
@dataclass
class Variables:
"""
All variables of the optimization problem, used for decisions and result values
Type `Any` shorthand for a solver variable object, or a result float/int
"""
var_task_starts: Dict[str, any] = None
var_task_ends: Dict[str, any] = None
var_task_durations: Dict[str, any] = None
var_task_overlap_break: Dict[str, any] = None
var_intervals: Dict[str, any] = None
total_duration: Dict[str, any] = None
make_span: Dict[int, any] = None
def extract_solution(solver: cp_model.CpSolver, variables):
"""Extracts the solution from the variables
Three kinds of variables have been considered
Integer, Boolean and Interval Variable
Incase of other types of variables,
this function would require modification
"""
# Initializing solution class
solution = copy.deepcopy(variables)
for varname, vardict in variables.__dict__.items():
setattr(solution, varname, None)
# Assigning variable values
for varname, vardict in variables.__dict__.items():
if vardict is not None:
setattr(
solution,
varname,
{
k: solver.value(v) if type(v) not in [cp_model.IntervalVar] else v
for k, v in vardict.items()
},
)
else:
logger.warning(f"Variable '{varname}' is defined but not used in model")
return solution
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
variables = Variables()
break_offset = 0
num_of_tasks = 1
max_time = num_of_tasks * 3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i * num_of_tasks + break_offset, i * num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x * 3 + break_offset + 1 for x in range(-1, num_of_tasks) if x * 3 + break_offset + 1 >= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_values([x for x in starts_no_break])
domain_break = cp_model.Domain.from_values([x for x in starts_break])
model = cp_model.CpModel()
variables.var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
variables.var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
variables.var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
variables.var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
for task in tasks:
if task == 0:
continue
model.add(variables.var_task_ends[task - 1] <= variables.var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_break).only_enforce_if(
variables.var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_no_break).only_enforce_if(
variables.~var_task_overlap_break[task]
)
# model.add(variables.var_task_durations[task] == processing_time + variables.var_task_overlap_break[task] * 1)
model.add(variables.var_task_durations[task] == processing_time + 1).only_enforce_if(
variables.var_task_overlap_break[task]
)
model.add(variables.var_task_durations[task] == processing_time).only_enforce_if(
variables.~var_task_overlap_break[task]
)
# intervals
variables.var_intervals = {
task: model.new_interval_var(
variables.var_task_starts[task],
variables.var_task_durations[task],
variables.var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
tracking_method = 1
if tracking_method == 0:
# native cumulative
variables.resource_needs = {'all': model.new_int_var(0, 10, 'resource_needs')}
resource_needs = {task: 1 for task in tasks}
intervals, headcounts = [], []
# 2.1 Add tasks requirements
# 2.1.1 Add demand-based intervals
for task in tasks:
intervals.append(variables.var_intervals[task])
headcounts.append(resource_needs[task])
# 2.2 Create cumulative constraints
model.add_cumulative(intervals, headcounts, variables.resource_needs['all'])
elif tracking_method == 1:
# cumulative_with_start_time
max_r = 10
lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]
times_min = min(lb)
times_max = max(ub)
time_range = range(times_min, times_max + 1)
variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
for t in time_range
}
variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in time_range
}
variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in time_range
for duration in [2, 3]
}
variables.var_task_starts_presence = {
(task, t): model.new_bool_var(f'start_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
for task in tasks:
model.add_exactly_one([variables.var_task_starts_presence[task, t] for t in time_range])
# Define min & max duration
min_duration = variables.var_task_durations[task].proto().domain[0]
max_duration = variables.var_task_durations[task].proto().domain[1]
for t in time_range:
# Capture start time
model.add(variables.var_task_starts[task] == t).only_enforce_if(
variables.var_task_starts_presence[task, t])
# Capture based on duration
for duration in [min_duration, max_duration]:
model.add_max_equality(variables.duration_task_resource_needs[task, t, duration],
[variables.var_task_starts_presence[task, t_start]
for t_start in range(t - duration + 1, t + 1)
if (task, t_start) in variables.var_task_starts_presence
]
)
# Capture the actual task duration and the needs
model.add(variables.task_resource_needs[task, t]
== variables.duration_task_resource_needs[task, t, 2]
).only_enforce_if(variables.~var_task_overlap_break[task])
model.add(variables.task_resource_needs[task, t]
== variables.duration_task_resource_needs[task, t, 3]
).only_enforce_if(variables.var_task_overlap_break[task])
# Capture total resource needs
for t in range(times_min, times_max + 1):
model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
for task in tasks
]
))
elif tracking_method == 2:
"""
Cumulative with overlap
"""
max_r = 10
lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]
times_min = min(lb)
times_max = max(ub)
resource_needs = {task: 1 for task in tasks}
variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
for t in range(times_min, times_max + 1)
}
variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
for duration in [2, 3]
}
variables.var_task_starts_presence = {
(task, t): model.new_bool_var(f'start_{task}_{t}')
for task in tasks
for t in range(times_min, times_max + 1)
}
for task in tasks:
for t in range(times_min, times_max + 1):
# s[i] < t
b1 = model.new_bool_var("")
model.add(variables.var_task_starts[task] <= t).only_enforce_if(b1)
model.add(variables.var_task_starts[task] > t).only_enforce_if(~b1)
# t < e[i]
b2 = model.new_bool_var("")
model.add(t < variables.var_task_ends[task]).only_enforce_if(b2)
model.add(t >= variables.var_task_ends[task]).only_enforce_if(~b2)
# b1 and b2 (b1 * b2)
b3 = model.new_bool_var("")
model.add_bool_and([b1, b2]).only_enforce_if(b3)
model.add_bool_or([~b1, ~b2]).only_enforce_if(~b3)
# b1 * b2 * r[i]
model.add_multiplication_equality(variables.task_resource_needs[task, t], [b3, resource_needs[task]])
# Capture total resource needs
for t in range(times_min, times_max + 1):
model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
for task in tasks
]
))
# Objectives
variables.make_span = {0: model.new_int_var(0, max_time, "make_span")}
model.add_max_equality(
variables.make_span[0],
[variables.var_task_ends[task] for task in tasks]
)
variables.total_duration = {0: model.new_int_var(0, max_time, "make_span")}
model.add(variables.total_duration[0] == sum(variables.var_task_durations.values()))
model.minimize(variables.make_span[0] + variables.total_duration[0])
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print("total time:", total_time)
print_result = True
solution: Variables = extract_solution(solver, variables)
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solution.var_task_starts[task],
solution.var_task_ends[task],
solution.var_task_overlap_break[task],
solution.var_task_durations[task],
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
if tracking_method == 0:
print('Using method: cumulative_with_built_in_feature')
print('resource_needs', solution.resource_needs)
elif tracking_method == 1:
print('Using method: cumulative_with_start_time')
print('resource_needs', solution.resource_needs)
print('task_resource_needs', solution.task_resource_needs)
print('duration_task_resource_needs', solution.duration_task_resource_needs)
print('var_task_starts_presence', solution.var_task_starts_presence)
elif tracking_method == 2:
print('Using method: cumulative_with_overlap')
print('resource_needs', solution.resource_needs)
print('task_resource_needs', solution.task_resource_needs)
print('duration_task_resource_needs', solution.duration_task_resource_needs)
print('var_task_starts_presence', solution.var_task_starts_presence)
print('Make-span:', solution.make_span[0])
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Max number of continuous tasks
Source: scheduling/example_09_max_number_of_continuous_tasks.py
What it does
Introduces campaigning by modeling campaigns explicitly as entities:
- For every product, pre-compute as many candidate campaigns as there are tasks of that product.
- Campaign variables:
start,end,duration,presence. var_task_campaign_presences[t, c]: tasktis assigned to campaignc. Exactly one per task.- Campaign duration is the sum of task durations assigned to it. Campaign start/end bound task starts/ends of its members.
- A campaign is present iff at least one task is assigned
(
add_max_equality). var_campaign_durations[c] <= max_conti_task_numcaps campaign size.- Campaigns are sequenced with a campaign-level
add_circuit, with a changeoverdistanceenforced between consecutive campaigns.
The expected pattern for four tasks A A A B is
A A -> CO -> A -> CO -> B.
Concepts
- Campaigning (approach 1: campaigns as entities)
- Circuit and sequencing
- Interval variables
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
3 A
4 B
'''
# 1. Data
tasks = {1, 2, 3, 4}
products = {'A', 'B'}
task_to_product = {1: 'A', 2: 'A', 3: 'A', 4: 'B'}
processing_time = {'A': 1, 'B': 1}
changeover_time = 2
max_conti_task_num = 2
max_time = 100
m_cost = {
(t1, t2): 0
if task_to_product[t1] == task_to_product[t2] else changeover_time
for t1 in tasks for t2 in tasks
if t1 != t2
}
# Campaign related data pre=processing
max_product_campaigns = {
product: len([task for task in tasks if task_to_product[task]==product]) for product in products
}
product_campaigns = {
(product, f"{product}_{campaign}")
for product in max_product_campaigns
for campaign in list(range(0, max_product_campaigns[product]))
#if max_product_campaigns[product] > 0
}
campaign_to_product = {
campaign: product for product, campaign in product_campaigns
}
campaigns = {campaign for product, campaign in product_campaigns}
product_to_campaigns = {
product: [c for c in campaigns if campaign_to_product[c] == product] for product in products
}
task_to_campaigns = {
task: [
campaign for campaign in campaigns if campaign_to_product[campaign] == task_to_product[task]
]
for task in tasks
}
campaign_size = {campaign: max_conti_task_num for campaign in campaigns}
campaign_duration = {campaign: max_conti_task_num for campaign in campaigns}
campaign_to_tasks = {
campaign:
[
task for task in tasks if campaign in task_to_campaigns[task]
]
for campaign in campaigns
}
m_cost_campaign = {
(c1, c2): 0
if campaign_to_product[c1] == campaign_to_product[c2] else changeover_time
for c1 in campaigns for c2 in campaigns
if c1 != c2
}
# Need changeover if we switch from a product to a different product
# We can continue to do tasks of the same product type but there is
# a max number of continuous production of tasks with the same product type
# For A A A, we expect A -> A -> Changeover -> A
# For A A A A A, we expect A -> A -> Changeover -> A A -> Changeover -> A
# For A B, we expect A -> Changeover -> B
# For A A B, we expect A -> A -> Changeover -> B
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
# variables_task_sequence = {
# (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
#
# variables_co_starts = {
# (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
#
# variables_co_ends = {
# (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end")
# for t1 in tasks
# for t2 in tasks
# if t1 != t2
# }
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Constraints
# Task Duration
# for task in tasks:
# model.add(
# variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
# )
var_task_intervals = {
t: model.new_interval_var(
variables_task_starts[t],
1,
variables_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
# Sequence
# arcs = list()
# for t1 in tasks:
# for t2 in tasks:
# # arcs
# if t1 != t2:
# arcs.append([
# t1,
# t2,
# variables_task_sequence[(t1, t2)]
# ])
# distance = m_cost[t1, t2]
# # cannot require the time index of task 0 to represent the first and the last position
# if t2 != 0:
# # to schedule tasks and c/o
# model.add(
# variables_task_ends[t1] <= variables_co_starts[t1, t2]
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add(
# variables_co_ends[t1, t2] <= variables_task_starts[t2]
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add(
# variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
# ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add_circuit(arcs)
# Campaigning related
var_campaign_starts = {
c: model.new_int_var(0, max_time, f"start_{c}") for c in campaigns
}
var_campaign_ends = {
c: model.new_int_var(0, max_time, f"c_end_{c}") for c in campaigns
}
var_campaign_durations = {
c: model.new_int_var(0, max_time, f"c_duration_{c}") for c in campaigns
}
var_campaign_presences = {
c: model.new_bool_var(f"c_presence_{c}") for c in campaigns
}
# Task Duration
# for c in campaigns:
# model.add(
# var_campaign_ends[c] - var_campaign_starts[c] == var_campaign_durations[c]
# )
# var_campaign_intervals = {
# c: model.new_optional_interval_var(
# var_campaign_starts[c], # campaign start
# var_campaign_durations[c], # campaign duration
# var_campaign_ends[c], # campaign end
# var_campaign_presences[c], # campaign presence
# f"c_interval_{c}",
# )
# for c in campaigns
# }
# If a task in allocated to a campaign
var_task_campaign_presences = {
(t, c): model.new_bool_var(f"task_{t}_presence_in_campaign_{c}") for t in tasks for c in task_to_campaigns[t]
}
# add_campaign_definition_constraints
# 1. Campaign definition: Start & duration based on tasks that belongs to the campaign
for c in campaigns:
# 1. Duration definition
model.add(
var_campaign_durations[c] == sum(
processing_time[task_to_product[t]] * var_task_campaign_presences[t, c]
for t in campaign_to_tasks[c]
)
)
# 2. Start-end definition
# TODO: MinEquality ?
for t in campaign_to_tasks[c]:
# var_campaign_starts
# var_campaign_ends
model.add(var_campaign_starts[c] <= variables_task_starts[t]).only_enforce_if(
var_task_campaign_presences[t, c]
)
model.add(variables_task_ends[t] <= var_campaign_ends[c]).only_enforce_if(
var_task_campaign_presences[t, c]
)
# 3. Link c & tc presence: If 1 task is scheduled on a campaign -> presence[t, c] = 1 ==> presence[c] == 1
# as long as there is one task in a campaign, this campaign must be present
model.add_max_equality(
var_campaign_presences[c], [var_task_campaign_presences[t, c] for t in campaign_to_tasks[c]]
)
# 2. Definition of the bool var: if a task belongs to a campaign
for task in tasks:
# One task belongs to at most 1 campaign
# task_to_campaigns[task]
model.add(
sum(var_task_campaign_presences[task, campaign]
for campaign in campaigns
if campaign in task_to_campaigns[task]
) == 1
)
# 3. No campaign overlap
# Campaigns won't overlap anyway. But we need this for tasks within campaigns
model.add_no_overlap([x for x in var_task_intervals.values()])
for c in campaigns:
model.add(
var_campaign_durations[c] <= 2
)
# Add campaign circuit
arcs = []
var_campaign_sequence = {}
for node_1, campaign_1 in enumerate(campaigns):
tmp1 = model.new_bool_var(f'first_to_{campaign_1}')
arcs.append([
0, node_1 + 1, tmp1
])
tmp2 = model.new_bool_var(f'{campaign_1}_to last')
arcs.append([
node_1 + 1, 0, tmp2
])
arcs.append([node_1 + 1, node_1 + 1, ~var_campaign_presences[campaign_1]])
# for outputting
var_campaign_sequence.update({(0, campaign_1): tmp1})
var_campaign_sequence.update({(campaign_1, 0): tmp2})
for node_2, campaign_2 in enumerate(campaigns):
if node_1 == node_2:
continue
indicator_node_1_to_node_2 = model.new_bool_var(f'{campaign_1}_to_{campaign_2}')
var_campaign_sequence.update({(campaign_1, campaign_2): indicator_node_1_to_node_2})
distance = m_cost_campaign[campaign_1, campaign_2]
arcs.append([node_1 + 1, node_2 + 1, indicator_node_1_to_node_2])
model.add(
var_campaign_ends[campaign_1] + distance <= var_campaign_starts[campaign_2]
).only_enforce_if(
indicator_node_1_to_node_2
).only_enforce_if(
var_campaign_presences[campaign_1]
).only_enforce_if(
var_campaign_presences[campaign_2]
)
model.add_circuit(arcs)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
# for t1 in tasks:
# for t2 in tasks:
# if t1 != t2:
# value = solver.value(var[t1, t2])
# if value == 1 and t2 != 0:
# print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]} cost: {m_cost[t1, t2]}')
# print('variables_task_sequence[t1, t2]',
# solver.value(variables_task_sequence[t1, t2]))
# print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
# print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
#
# if value == 1 and t2 == 0:
# print(f'{t1} --> {t2} Closing')
for c1 in list(campaigns) + [0]:
for c2 in list(campaigns) + [0]:
if c1 == c2:
continue
value = solver.value(var_campaign_sequence[c1, c2])
if value == 1 and c2 != 0 and c1 != 0:
c1_tasks = []
c2_tasks = []
for task in campaign_to_tasks[c1]:
if solver.value(var_task_campaign_presences[task, c1]) == 1:
c1_tasks.append(task)
for task in campaign_to_tasks[c2]:
if solver.value(var_task_campaign_presences[task, c2]) == 1:
c2_tasks.append(task)
print(f'{c1} --> {c2} {campaign_to_product[c1]} {c1_tasks} >> {campaign_to_product[c2]} {c2_tasks} cost: {m_cost_campaign[c1, c2]}')
if value == 1 and c1 == 0:
print(f'{c1} --> {c2} Starting')
if value == 1 and c2 == 0:
print(f'{c1} --> {c2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
Campaigning time constraint (empty)
Source: scheduling/example_18_campaigning_time_constraint.py
The file is empty. Placeholder kept for numbering. The intended topic was likely "campaigns with a maximum total duration"; see Campaigning with cumul for a related, working model.
Cleaning holding time (empty)
Source: scheduling/example_19_cleaning_holding_time.py
The file is empty. Placeholder kept for numbering. The intended topic was likely "cleaning events with a minimum or maximum holding time between tasks"; see Changeover as event for the closest working example.
Campaigning with cumul
Source: scheduling/example_24_campaigning_with_cumul.py
What it does
Introduces the cumulative rank approach to campaigning. One product,
many tasks, a configurable campaign_size.
-
var_task_cumul[t]is a rank in[0, campaign_size - 1]. -
var_task_reach_max[t]fires when the rank hits the cap. -
On each
t1 -> t2arc:- Continue campaign (
reach_max[t1]false):end[t1] <= start[t2]andcumul[t2] == cumul[t1] + 1. - End campaign (
reach_max[t1]true): insert changeover, resetcumul[t2] == 0.
- Continue campaign (
The circuit uses a node -1 as the first/last dummy.
The __main__ block benchmarks solve times over campaign sizes 2 and 3,
plotting scalability.
Concepts
- Campaigning (approach 2: cumulative rank)
- Circuit and sequencing
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result = True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# if from task has not reached MAX, continue the campaign
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
# if from task has reached MAX, apply changeover and reset its cumulative indicator
model.add(var_task_cumul[t2] == 0).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
return total_time
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 8
sizes = range(2, N+1)
model_times_campaign_2 = []
model_times_campaign_3 = []
for num_task in sizes:
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))
print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence
Source: scheduling/example_25_campaigning_with_locked_seq.py
What it does
Same cumulative-rank campaigning as 24, but adds a heuristic that locks the task order:
for task in tasks:
if task != 0:
model.add(var_task_ends[task - 1] <= var_task_starts[task])
When tasks happen to be indexed in the desired priority/deadline order, this can provide a large speed-up without changing the optimum.
Concepts
- Campaigning (task-order heuristic)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result = True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
for task in tasks:
if task != 0:
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# if from task has not reached MAX, continue the campaign
model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
literals[t1, t2]
).only_enforce_if(~var_task_reach_max[t1])
# if from task has reached MAX, apply changeover and reset its cumulative indicator
model.add(var_task_cumul[t2] == 0).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
literals[t1, t2]
).only_enforce_if(var_task_reach_max[t1])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
return total_time
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 25
sizes = range(2, N+1)
model_times_campaign_2 = []
model_times_campaign_3 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))
print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence improved
Source: scheduling/example_26_campaigning_locked_seq_improved.py
What it does
Refines 25 by removing the "force reach_max when cumul hits the cap"
implication. The solver is now free to end a campaign before reaching
the size limit if that gives a better objective.
The order lock becomes start[t-1] <= start[t] (no longer end <= start),
which is looser and compatible with flexible campaign ends.
Also introduces the add_max_equality(max_values, [0, cumul[t1] + 1 - reach_max[t1] * campaign_size]) trick for computing the next rank
variable under an only_enforce_if.
Concepts
- Campaigning (flexible campaign ends)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result=True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
# A relative later task shall not start earlier than a relative earlier task
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] <= var_task_starts[task])
for task in tasks:
model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# # if from task has not reached MAX, continue the campaign
# model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(~var_task_reach_max[t1])
# model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(~var_task_reach_max[t1])
#
# # if from task has reached MAX, apply changeover and reset its cumulative indicator
# model.add(var_task_cumul[t2] == 0).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(var_task_reach_max[t1])
# model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
# literals[t1, t2]
# ).only_enforce_if(var_task_reach_max[t1])
model.add(
var_task_ends[t1] + var_task_reach_max[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
model.add(
var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
).only_enforce_if(
literals[t1, t2]
)
# The following makes the model invalid
# model.add_max_equality(
# var_task_cumul[t2],
# [
# 0,
# var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
# ]
# ).only_enforce_if(
# literals[t1, t2]
# )
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, label='Campaign size: 2')
plt.plot(x, y2, label='Campaign size: 5')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
N = 60
sizes = range(2, N+1, 4)
model_times_campaign_2 = []
model_times_campaign_5 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))
df = pd.DataFrame({
'num_tasks': sizes,
'c2': model_times_campaign_2,
'c5': model_times_campaign_5
})
df.to_csv("example_26_result.csv")
print_unit_test_result(sizes,
model_times_campaign_2,
model_times_campaign_5,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning, locked sequence improved (flexible)
Source: scheduling/example_26_campaigning_locked_seq_improved_flexible.py
What it does
Twin of 26a with the
same flexible campaign-end semantics, plus a comment line that warns
against forcibly setting reach_max when the cap is reached - the
intended behavior is to let the solver decide.
The two 26 files exist side by side to document the tweak explicitly.
Concepts
- Campaigning (flexible campaign ends)
Source
# Inspired by https://stackoverflow.com/questions/75554536
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
model = cp_model.CpModel()
def generate_one_product_data(num_tasks):
""" Generate N tasks of product A """
tasks = {i for i in range(num_tasks)}
task_to_product = ({i: 'A' for i in range(int(num_tasks))})
return tasks, task_to_product
def run_model(num_tasks, campaign_size, print_result=True):
# if campaign size is 2, then we need cumul indicator to be 0, 1
changeover_time = 2
max_time = num_tasks*2
processing_time = 1
tasks, task_to_product = generate_one_product_data(num_tasks)
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
model.add(var_task_cumul[0]==0)
var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
# Lock the sequence of the tasks (assume the deadlines are in this sequence !)
# A relative later task shall not start earlier than a relative earlier task
for task in tasks:
if task != 0:
model.add(var_task_starts[task-1] <= var_task_starts[task])
# ! SHALL REMOVE THE FOLLOWING ! LEAVE THE DECISION (OF STARTING A NEW CAMPAIGN OR NOT) BACK TO THE MODEL !
# for task in tasks:
# model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
# model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])
var_task_intervals = {
t: model.new_interval_var(
var_task_starts[t],
processing_time,
var_task_ends[t],
f"task_{t}_interval"
)
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
# This is for fixed campaigning size. DO full-campaign or NOT DO.
# model.add(
# var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
# ).only_enforce_if(literals[t1, t2])
# The creator has confirmed add_max_equality is not compatible with only_enforce_if
# model.add_max_equality(
# var_task_cumul[t2],
# [0,var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
# ).only_enforce_if(literals[t1, t2])
# ! HERE IS THE CHANGE RECOMMENDED FOR FLEXIBLE CAMPAIGNING. BUT IN TWO STEPS !
# NOTE var_reach_campaign_end ARE NOW OPEN BOOL DECISION VARIABLES.
#
# If reaching limit (var_task_cumul +1 is equal to campaign_size), the expression is max (0,0)
# the model must do C/O because var_task_cumul[t2] has to be 0 (a new campaign starts).
#
# If not yet reaching limit (var_task_cumul +1 < campaign_size), the expression is max(0, -x)
# model can still choose to do C/O by having var_reach_campaign_end to be 1 (if that brings a better obj)
model.add_max_equality(
max_values[t1, t2],
[0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
)
model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
solver.value(var_reach_campaign_end[task])
)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
def print_unit_test_result(x, y1, y2, title=''):
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(x, y1, label='Campaign size: 2')
plt.plot(x, y2, label='Campaign size: 5')
plt.legend()
plt.title(title)
plt.xlabel('The number of tasks')
plt.ylabel('Seconds')
plt.show()
if __name__ == '__main__':
# print('Point check for run_model(40, 10)\nExpect make-span = 40*1 + (40/10 - 1)*2 = 40 + 6 = 46')
# run_model(40, 10)
N = 60
sizes = range(2, N+1, 4)
model_times_campaign_2 = []
model_times_campaign_5 = []
for num_task in sizes:
print(num_task)
model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))
df = pd.DataFrame({
'num_tasks': sizes,
'c2': model_times_campaign_2,
'c5': model_times_campaign_5
})
print(df)
print_unit_test_result(sizes,
model_times_campaign_2,
model_times_campaign_5,
'Scalability of Campaigning with Cumulative Indicator')
Campaigning across products
Source: scheduling/example_27_campaigning_products.py
What it does
Extends the cumulative-rank campaigning to multiple products.
product_change_indicator[t1, t2]is 1 ifft1andt2belong to different products.var_product_change[t1]captures whether the arc out oft1crosses a product boundary.var_reach_campaign_end[t1] >= var_product_change[t1]: a product change forces the campaign to end (and therefore a changeover).- The
add_max_equalitytrick from 26 is used to reset or increment the rank underonly_enforce_if(literals[t1, t2]). - An optional heuristic locks
cumul[t-1] <= cumul[t]per product group to speed things up.
The __main__ block prints the expected vs. solver make-span for a
parameter set, making it a small sanity check.
Concepts
- Campaigning (multi-product)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
import string
from math import ceil
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate tasks of products (no more than 26 products) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, print_result=True):
"""
Do changeovers if either of the following occurs:
1. Changeover between different products: [A] -> changeover -> [B]
2. Previous campaign reaching size limit: [A ... A] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 3
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*2
processing_time = 1
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print(f'\nInput data: {task_to_product}\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
for product_idx, product in enumerate(range(number_of_products)):
model.add(var_task_cumul[product_idx*num_of_tasks_per_product] == 0)
var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
var_product_change = {task: model.new_bool_var(f"task_{task}_go_to_different_product") for task in tasks}
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("Apply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLocking {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_cumul[_index-1] <= var_task_cumul[_index])
# Option 2: Locking the sequence of all tasks! This is quicker (0.10s for 3, 4, 4)
# print("Locking 0", end=" ")
# for task in tasks:
# if task != 0:
# print(f"<= {task}", end=" ")
# model.add(var_task_starts[task-1] <= var_task_starts[task])
print("\n")
var_task_intervals = {
t: model.new_interval_var(var_task_starts[t], processing_time, var_task_ends[t], f"task_{t}_interval")
for t in tasks
}
model.add_no_overlap(var_task_intervals.values())
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[t1, t2]])
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(
literals[t1, t2]
)
model.add(var_reach_campaign_end[t1] >= var_product_change[t1])
model.add(
var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[t1, t2]
)
# allow flexible campaigning
model.add_max_equality(
max_values[t1, t2],
[0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
)
model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
if print_result:
if status == cp_model.OPTIMAL:
for task in tasks:
if task%num_of_tasks_per_product == 0:
print('--------- product divider ---------\n')
print(f'Task {task} {task_to_product[task]}',
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_cumul[task]),
solver.value(var_reach_campaign_end[task]),
solver.value(var_product_change[task]),
)
if solver.value(var_reach_campaign_end[task]):
print('-- campaign ends --\n')
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
_number_of_products = 3
_num_of_tasks_per_product = 4
_campaign_size = 4
n = ceil(_num_of_tasks_per_product/_campaign_size)
_make_span = (_num_of_tasks_per_product + 2*n)*_number_of_products-2
run_time = run_model(_number_of_products, _num_of_tasks_per_product, _campaign_size)
print(f"Runtime: {round(run_time, 2)}s")
print(f'\nExpected make-span if all right:'
f'\n = (_num_of_tasks_per_product + changeover_time*ceil(_num_of_tasks_per_product/_campaign_size))*'
f'_number_of_products - changeover_time '
f'\n = {_make_span}')
Campaigning products x machines
Source: scheduling/example_28_campaigning_products_machines.py
What it does
Combines 27 (multi-product campaigning) with the multi-machine structure from 03.
- Rank,
reach_campaign_end, andproduct_changebooleans become indexed by(machine, task). - Per-machine
add_circuit, with machine-task presence self-loops. - Per-machine campaigning rules (continue vs. end campaign) inside each circuit's arc constraints.
- Objective is still
make_span.
Concepts
- Campaigning (multi-product, multi-machine)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
"""
Allocate to tasks to multiple machines
And do changeover if either of the following occurs:
1. Switch between different products: [A campaign] -> changeover -> [B campaign]
2. Previous campaign reaching the size limit: [A FULL campaign] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 2
# number_of_machines = 2
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*3
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product, '\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
model.add_decision_strategy(
var_m_t_product_change.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE
)
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("\nApply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLock {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_ends[_index-1] <= var_task_starts[_index])
print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
# if t1 > t2:
# model.add(literals[m, t1, t2] == 0)
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# show the result if getting the optimal one
if print_result:
if status == cp_model.OPTIMAL:
big_list = []
for m in machines:
for task in tasks:
if solver.value(var_machine_task_presences[m, task]):
tmp = [
f"machine {m}",
f"task {task}",
task_to_product[task],
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_machine_task_rank[m, task]),
solver.value(var_m_t_product_change[m, task]),
solver.value(var_m_t_reach_campaign_end[m, task])
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
df = df.sort_values(['machine', 'start'])
for m in machines:
print(f"\n======= Machine {m} =======")
df_tmp = df[df['machine']==f"machine {m}"]
print(df_tmp)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
# number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
args = 4, 4, 3, 3
runtime = run_model(*args)
print(f"Solving time: {round(runtime, 2)}s")
Campaigning with pregrouping (empty)
Source: scheduling/example_30_campaigning_with_pregrouping.py
The file is empty. Placeholder kept for numbering. The intended topic was likely "pre-group tasks into candidate campaigns before handing them to the solver"; see Max number of continuous tasks for the closest working model.
Campaigning faster
Source: scheduling/example_31_campaigning_faster.py
What it does
A tuned variant of 28 (multi-product, multi-machine campaigning) aiming for faster solves. The structure is the same; differences are in which heuristics are enabled and how the constraints are ordered.
It is the natural next step after 28 when experimenting with scalability.
Concepts
- Campaigning (tuning)
- Solver techniques
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string
model = cp_model.CpModel()
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
"""
Allocate to tasks to multiple machines
And do changeover if either of the following occurs:
1. Switch between different products: [A campaign] -> changeover -> [B campaign]
2. Previous campaign reaching the size limit: [A FULL campaign] -> changeover -> next any campaign
"""
# number_of_products = 2
# num_of_tasks_per_product = 4
# campaign_size = 2
# number_of_machines = 2
# print_result = True
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*3
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product)
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
model.add_decision_strategy(
var_m_t_product_change.values(),
cp_model.CHOOSE_FIRST,
cp_model.SELECT_MIN_VALUE
)
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
print("Apply the tasks sequence heuristics")
# Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
for product_idx, product in enumerate(range(number_of_products)):
for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
_index = product_idx * num_of_tasks_per_product + task_id_in_product_group
if task_id_in_product_group == 0:
print(f"\nLock {_index}", end=" ")
else:
print(f" <= {_index}", end=" ")
model.add(var_task_ends[_index-1] <= var_task_starts[_index])
print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
# this accelerates code much
# if t1 > t2 and task_to_product[t1]==task_to_product[t2]:
# model.add(literals[m, t1, t2] == 0)
if t1 > t2:
model.add(literals[m, t1, t2] == 0)
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
# show the result if getting the optimal one
if print_result:
if status == cp_model.OPTIMAL:
big_list = []
for m in machines:
for task in tasks:
if solver.value(var_machine_task_presences[m, task]):
tmp = [
f"machine {m}",
f"task {task}",
task_to_product[task],
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_machine_task_rank[m, task]),
solver.value(var_m_t_product_change[m, task]),
solver.value(var_m_t_reach_campaign_end[m, task])
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
df = df.sort_values(['machine', 'start'])
for m in machines:
print(f"\n======= Machine {m} =======")
df_tmp = df[df['machine']==f"machine {m}"]
print(df_tmp)
print('-------------------------------------------------')
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)
if status == cp_model.OPTIMAL:
return total_time
else:
return -999
if __name__ == '__main__':
# number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
args = 4, 4, 3, 3
runtime = run_model(*args)
print(f"Solving time: {round(runtime, 2)}s")
Solving by phases
Source: scheduling/example_32_solving_by_phases.py
What it does
Demonstrates warm-starting CP-SAT across multiple solves of the same model.
create_model(...)returns a campaigning-with-machines model similar to 28.- A list of
phaseswith growingmax_timeis defined. - For each phase, the model is solved. The resulting solution is read
back with
get_solutions(model, solver)and fed as hints to the next phase using a customadd_hints(model, solution)helper. model.clear_hints()resets between phases.
This is useful when a large horizon is needed for feasibility but a short horizon gives a fast starting point.
Concepts
- Solver techniques (hints, phases)
- Campaigning
Source
from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import numpy as np
import string
def generate_task_data(num_of_products, num_of_tasks_per_product):
""" Generate the same number of tasks for multiple products (no more than 26 products please) """
products = string.ascii_uppercase[0:num_of_products]
total_num_of_tasks = num_of_tasks_per_product*num_of_products
tasks = {x for x in range(total_num_of_tasks)}
task_to_product = {}
for product_idx, product in enumerate(products):
task_to_product.update({
product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
})
return tasks, task_to_product
def create_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
model = cp_model.CpModel()
changeover_time = 2
max_time = num_of_tasks_per_product*number_of_products*5
processing_time = 1
machines = {x for x in range(number_of_machines)}
tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
print('Input data:\nTasks:', tasks, task_to_product, '\n')
product_change_indicator = {
(t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
}
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}
var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}
# No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
# for product_idx, product in enumerate(range(number_of_products)):
# print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
# for m in machines:
# model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)
var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}
# This is optional
# model.add_decision_strategy(
# var_m_t_product_change.values(),
# cp_model.CHOOSE_FIRST,
# cp_model.SELECT_MIN_VALUE
# )
# Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
# AND a task with later deadline shall not start earlier than a task with a earlier deadline)
# print("\nApply the tasks sequence heuristics")
# # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
# for product_idx, product in enumerate(range(number_of_products)):
# for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
# _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
# if task_id_in_product_group == 0:
# print(f"\nLock {_index}", end=" ")
# else:
# print(f" <= {_index}", end=" ")
# model.add(var_task_ends[_index-1] <= var_task_starts[_index])
# print("\n")
# These intervals is needed otherwise the duration is not constrained
var_machine_task_intervals = {
(m, t): model.new_optional_interval_var(
var_machine_task_starts[m, t],
processing_time,
var_machine_task_ends[m, t],
var_machine_task_presences[m, t],
f"task_{t}_interval_on_m_{m}")
for t in tasks for m in machines
}
# each task is only present in one machine
for task in tasks:
task_candidate_machines = machines
tmp = [
var_machine_task_presences[m, task]
for m in task_candidate_machines
]
model.add_exactly_one(tmp)
# link task-level to machine-task level for start time & end time
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
var_task_starts[task] == var_machine_task_starts[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
model.add(
var_task_ends[task] == var_machine_task_ends[m, task]
).only_enforce_if(var_machine_task_presences[m, task])
# Set objective to minimize make-span
make_span = model.new_int_var(0, max_time, "make_span")
total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
model.minimize(make_span)
# the bool variables to indicator if t1 -> t2
literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# the technical variables to allow flexible campaigning
max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
for m in machines for t1 in tasks for t2 in tasks if t1 != t2}
# schedule the tasks
for m in machines:
arcs = []
for t1 in tasks:
arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])
for t2 in tasks:
if t1 == t2:
continue
arcs.append([t1, t2, literals[m, t1, t2]])
# If A -> B then var_m_t_product_change>=1 (can be 0 if the last task in a machine)
model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
literals[m, t1, t2]
)
# If var_m_t_product_change=1 then the campaign must end
model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])
# if the campaign ends then there must be changeover time
# [ task1 ] -> [ C/O ] -> [ task 2]
model.add(
var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
).only_enforce_if(
literals[m, t1, t2]
)
# model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
# has to do this in two steps since add_max_equality is not compatible with only_enforce_if
model.add_max_equality(
max_values[m, t1, t2],
[0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
)
model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])
model.add_circuit(arcs)
return model
# This takes 16 seconds
model = create_model(5, 20, 3, 1)
# solver = cp_model.CpSolver()
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(status)
# print(solver.objective_value())
# #10
# print(total_time, status)
phases = [
{'phase_id': 0, 'max_time': 0.5},
{'phase_id': 1, 'max_time': 1},
{'phase_id': 2, 'max_time': 2},
{'phase_id': 3, 'max_time': 4},
{'phase_id': 4, 'max_time': 8}
]
phases = [
{'phase_id': 0, 'max_time': 10},
{'phase_id': 1, 'max_time': 10},
{'phase_id': 2, 'max_time': 10},
{'phase_id': 3, 'max_time': 10},
{'phase_id': 4, 'max_time': 10}
]
phases = [
{'phase_id': 0, 'max_time': 5},
{'phase_id': 1, 'max_time': 10},
{'phase_id': 2, 'max_time': 15},
{'phase_id': 3, 'max_time': 20},
{'phase_id': 4, 'max_time': 25}
]
n = 6
phases = [
{'phase_id': i, 'max_time': (i+1)*10} for i in range(n)
]
def get_solutions(model, solver):
vars_sol = {}
for i, var in enumerate(model.proto().variables):
value = solver.response_proto().solution[i]
vars_sol[var.name] = value
return vars_sol
def add_hints(model, solution):
for i, var in enumerate(model.proto().variables):
if var.name in solution:
model.proto().solution_hint.vars.append(i)
model.proto().solution_hint.values.append(solution[var.name])
# get_solutions(model, solver)
obj_list = []
for phase in phases:
phase_id, max_time = phase['phase_id'], phase['max_time']
print('----------------------------')
model.clear_hints()
if phase_id == 0:
solver = cp_model.CpSolver()
if phase_id > 0 and 'solution' in locals():
print("Add hints")
add_hints(model, solution)
print('number of variables in solution hints:', len(model.proto().solution_hint.vars))
#solver.parameters.keep_all_feasible_solutions_in_presolve = True
solver.parameters.max_time_in_seconds = max_time
start = time()
status = solver.solve(model=model)
print('number of solutions:', solver.response_proto().solution)
if status == 1 or status == 3:
print(f'error status : {status}')
break
if status == 0:
print(f'Cannot find a feasible solution in the given time {max_time}. status:{status}')
obj_list.append(np.nan)
continue
obj_value = solver.objective_value()
obj_list.append(obj_value)
solution = get_solutions(model, solver)
total_time = time() - start
print(f"phase_id: {phase_id}, max_time: {max_time}, status: {status}, obj: {obj_value}. total time: {round(total_time,1)}")
if status == 4:
print('======================================================')
print('Optimal Solution Achieved ! No need to continue')
break
print(obj_list)
times = [(i+1)*5 for i in range(n)]
ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(times, obj_list, marker='o')
plt.legend()
plt.title(f'time vs obj')
plt.xlabel('running seconds')
plt.ylabel('obj')
plt.show()
# self.alg_data, self.model = solver.solve(
# alg_data=self.alg_data,
# previous_model=self.model,
# model_parameters=self.model_parameters,
# )
#
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
# value = solver.response_proto().solution[i]
# vars_sol[var.name] = value
#
#
#
# solver = cp_model.CpSolver()
# solver.parameters.max_time_in_seconds = 0.5
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# #model.ExportToFile("C:/Temp/xxxx.pb.txt")
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
# print(var)
# print(var.name)
# print(var.domain)
# print('-------------------')
# vars_sol[var.name] = var
#
# model.proto().solution_hint
# for i, var in enumerate(model.proto().variables):
# if var.name in vars_sol:
# model.proto().solution_hint.vars.append(i)
# model.proto().solution_hint.values.append(vars_sol[var.name])
# #
#
# print(model.ModelStats())
# @dataclass
# class PhaseParameters:
# """ Gather different input optimisation parameters that can be set for each phase. """
#
# obj_phase: str = None
# max_time_in_seconds: int = None
# solution_count_limit: int = None
# restart: bool = False
#
#
# phase_1 = PhaseParameters('phase_1', 10)
# phase_2 = PhaseParameters('phase_2', 10)
#
#
# show the result if getting the optimal one