Introduction

This book collects notes and examples of scheduling models built with OR-Tools CP-SAT. The focus is on job-shop style problems: sequencing tasks on machines, handling changeovers, respecting breaks and shifts, modeling resources, and grouping tasks into campaigns.

The book is split into two parts.

  • Concepts walks through the core modeling ideas once. Read these first if CP-SAT or constraint programming is new to you.
  • Examples indexes the Python files in the scheduling/ folder and links each one back to the concepts it demonstrates. Examples are numbered and build on each other, so reading them in order is usually easiest.

All code lives in the scheduling/ directory of the repo. Open a file in your editor while reading the corresponding chapter to see the full model.

A minimal CP-SAT template

Almost every example follows the same five-step shape.

from ortools.sat.python import cp_model

# 1. Data
# ... sets, durations, changeover table, etc.

# 2. Decision variables
model = cp_model.CpModel()
# ... start/end/interval/bool vars

# 3. Objective
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [ends[t] for t in tasks])
model.minimize(make_span)

# 4. Constraints
# ... precedence, resources, circuits, ...

# 5. Solve and post-process
solver = cp_model.CpSolver()
status = solver.solve(model)

The rest of the book explains what goes into step 4.

CP-SAT basics

CP-SAT is a constraint-programming solver with integer and boolean variables. Before looking at scheduling, it helps to be comfortable with a few primitives.

Variables

x = model.new_int_var(0, 100, 'x')      # integer in [0, 100]
b = model.new_bool_var('b')             # boolean (integer in {0, 1})

Linear constraints

model.add(x + y == 10)
model.add(x <= y)
model.add(sum(bs) == 1)               # exactly-one on a list of bools
model.add_exactly_one(bs)               # same, more idiomatic

Boolean combinations

model.add_bool_or([a, b])               # a or b
model.add_bool_and([a, b])              # a and b
model.add_bool_xor([a, b])              # exactly one of a, b

Reification with only_enforce_if

A constraint can be conditioned on a boolean literal. The constraint is only active when the literal is true.

model.add(x >= 5).only_enforce_if(b)
model.add(x < 5).only_enforce_if(~b)

Chaining two only_enforce_if calls gives an "and" of conditions:

model.add(y == 1).only_enforce_if(b1).only_enforce_if(b2)  # y == 1 iff b1 and b2

For "or", you normally introduce intermediate booleans or use add_bool_or.

add_min_equality, add_max_equality, add_multiplication_equality

These express z = min(xs), z = max(xs), z = x * y (or the product of a list). add_max_equality is how makespan is usually encoded:

model.add_max_equality(make_span, [ends[t] for t in tasks])

Domains

For "x belongs to a non-contiguous set of values" use add_linear_expression_in_domain:

domain = cp_model.Domain.from_intervals([[0, 4], [11, 100]])
model.add_linear_expression_in_domain(x, domain)

See example_00_unit_tests.py for a collection of small snippets exercising each of these.

Solve and read back

solver = cp_model.CpSolver()
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
    print(solver.value(x))

Interval variables

An interval variable bundles three integer variables - start, size, end - with the implicit constraint start + size == end. CP-SAT uses intervals to reason efficiently about scheduling: add_no_overlap and add_cumulative both expect intervals.

Three flavors

Regular interval

iv = model.new_interval_var(start, size, end, name="t1")

Replaces a manual model.add(end - start == size).

Optional interval

An interval that is only scheduled when a presence boolean is true. Essential when a task may or may not be assigned to a given machine.

iv = model.new_optional_interval_var(start, size, end, is_present, name="t1_on_m1")

If is_present is false, the interval disappears from add_no_overlap / add_cumulative reasoning.

Fixed-size interval

Convenient for breaks, shift boundaries, and anything with a known position.

br = model.new_fixed_size_interval_var(start=2, size=1, name="break")

Typical use

intervals = {
    (m, t): model.new_optional_interval_var(
        starts[m, t],
        processing_time[product_of(t)],
        ends[m, t],
        presence[m, t],
        f"t{t}_on_m{m}",
    )
    for t in tasks for m in machines
}

for m in machines:
    model.add_no_overlap([intervals[m, t] for t in tasks])

Examples that introduce intervals: example_05_seq_with_intervals.py (first use), example_03_seq_scale_Mathieu.py (dramatic speed-up vs. manual duration constraints).

Circuit and sequencing

To sequence tasks on a machine, you need both the order and the time constraints that follow from it. CP-SAT's add_circuit is the standard tool.

What add_circuit does

add_circuit(arcs) takes a list of triples [i, j, literal]. It asserts that the selected arcs (where literal == 1) form a single Hamiltonian circuit over the referenced nodes. Self-arcs [i, i, literal] mean node i is skipped when literal is true.

arcs = []
for t1 in tasks:
    arcs.append([0, t1, start_literal(t1)])   # dummy -> t1 (first)
    arcs.append([t1, 0, end_literal(t1)])     # t1 -> dummy (last)
    arcs.append([t1, t1, ~presence[t1]]) # skip t1 if absent
    for t2 in tasks:
        if t1 == t2:
            continue
        arcs.append([t1, t2, seq[t1, t2]])

model.add_circuit(arcs)

Node 0 (or -1) is typically a dummy "first/last" node.

Linking the circuit to time

add_circuit only picks the order. You also need: if t1 -> t2 is chosen, then end[t1] + gap <= start[t2]. This is a reified constraint:

model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])

gap is usually changeover_time (see Changeover) or 0.

Multi-machine

With multiple machines, build one circuit per machine and gate absent tasks with self-loops:

for m in machines:
    arcs = []
    for t in tasks:
        arcs.append([t, t, ~presence[m, t]])
        for t2 in tasks:
            if t != t2:
                arcs.append([t, t2, seq[m, t, t2]])
    model.add_circuit(arcs)

add_exactly_one(presence[m, t] for m in machines) ensures each task ends up on exactly one machine.

Examples: example_01_simple_sequence.py (single machine), example_03_seq_multi_stations.py (multi-machine).

Changeover

A changeover is the time needed to switch a machine from producing one product to another. There are three common ways to model it.

1. In the objective

Charge a cost for each seq[t1, t2] whose products differ. The cost does not appear in the schedule itself, only the cost minimised.

total_co = sum(seq[t1, t2] * changeover_cost[t1, t2] for ...)
model.minimize(make_span + total_co)

Simple, but time and cost are decoupled: the schedule may not leave physical room for the changeover.

Example: example_01_simple_sequence.py.

2. In the precedence constraint

Include the changeover in the gap between tasks:

gap = changeover_time if products_differ(t1, t2) else 0
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])

Now time and cost agree: a changeover actually pushes the next task later.

Example: example_04_seq_with_changeover_in_constraint.py.

3. As a first-class event

Create an optional interval for every (t1, t2) that represents the changeover itself. When t1 -> t2 is chosen, the interval is present, sits between the two tasks, and has the right duration.

co_iv = model.new_optional_interval_var(co_start, co_duration, co_end, co_present, ...)
model.add(end[t1] <= co_start).only_enforce_if(seq[t1, t2])
model.add(co_end <= start[t2]).only_enforce_if(seq[t1, t2])
model.add(co_present == 1).only_enforce_if(seq[t1, t2])

This lets you add the changeover interval to add_cumulative (it consumes operator time) or apply cleaning-resource constraints to it.

Example: example_08_changeover_as_event.py.

Starting product

A machine usually begins with some product already loaded. Model it with a dummy task 0 whose "product" is the starting product; the cost from dummy to the first real task is zero if they match, else the usual changeover.

Example: example_02_seq_lock_starting_product.py.

Breaks

A break is a time window during which a machine or operator is unavailable. Three techniques cover most cases.

1. Break as a fixed interval in add_cumulative

For each break, build a new_fixed_size_interval_var and add it alongside task intervals with the full demand. Tasks are pushed around the break.

break_intervals = [
    model.new_fixed_size_interval_var(start=s, size=e - s, name="break")
    for (s, e) in breaks
]
all_intervals = task_intervals + break_intervals
demands = [1] * len(task_intervals) + [1] * len(break_intervals)
model.add_cumulative(all_intervals, demands, capacity=1)

Example: example_07_break_without_changeover.py.

2. Task duration stretched by overlapping breaks

When a task may run through a break and the break simply extends its total time on the machine, use per-time-slot booleans that indicate whether the task uses slot i, then add is_break[i] for each covered slot.

uses[t, i] = starts_before_i AND ends_after_i
duration[t] = base + sum(is_break[i] * uses[t, i] for i)
interval[t] = new_interval_var(start, duration, end, ...)

Example: example_14_task_delaying_break.py.

3. Break-aware start domains

If the break pattern is periodic, restrict task starts to the valid slots with add_linear_expression_in_domain. Much faster than per-slot booleans.

domain_no_break = cp_model.Domain.from_values([...])
model.add_linear_expression_in_domain(start[t], domain_no_break)

Example: example_29_linear_domain_for_breaks.py, example_33_conditional_duration_linear_domain.py.

Automatic jobs

Some "automatic" tasks don't consume the operator while running (think: a machine runs itself after a short manual setup). Model only the setup portion inside the cumulative, using a 1-unit interval at the task's start.

Example: example_12_an_automatic_job.py, example_13_automatic_jobs.py.

Shifts

A shift is a working window. Tasks must fit inside one shift (or, depending on policy, be split / disallowed from crossing shifts).

Synthetic shift breaks

Insert a tiny "fake break" interval at each shift boundary and forbid any task from overlapping it with add_no_overlap. This prevents shift-crossing without enumerating shift assignments.

for (s, e) in synthetic_shift_breaks:
    br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
    for t in tasks:
        model.add_no_overlap([task_interval[t], br])

Example: example_16_shift_crossing_fake_time_unit.py.

Explicit shift assignment

Alternatively, give every task a one-hot presence[shift, task] and enforce the shift window when present:

for t in tasks:
    model.add_exactly_one(presence[s, t] for s in shifts)
    for s in shifts:
        model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
        model.add(end[t]   <= shift_end[s]  ).only_enforce_if(presence[s, t])

More variables, but the assignment is explicit and easy to extend (e.g. to per-shift capacity).

Example: example_17_shift_crossing_mathieu.py.

Resources and cumulative

add_no_overlap says "at most one interval at a time". add_cumulative is the generalisation: each interval consumes some amount of a shared resource, and the total consumption must not exceed a capacity.

No-overlap

model.add_no_overlap(intervals)

Used per machine (one task at a time) and per stage (one job at a time in a flow-shop style).

Cumulative

model.add_cumulative(intervals, demands, capacity)

demands[i] is the amount of resource taken by intervals[i] while it runs. Typical uses:

  • Shared operator across machines. If two machines need the same operator, cumulative over all their task intervals with demand 1 and capacity 1 forbids parallel runs. Example: example_06_seq_with_intervals_resource.py.
  • Breaks. Treat a break as an interval that fully occupies the resource. Example: example_07_break_without_changeover.py.
  • Automatic jobs. Only the setup portion consumes the operator, modeled as a size-1 interval at each task's start.

Resource modes

Some tasks can run in different modes with different durations and headcounts. Encode the choice with a one-hot bool per mode and derive the actual processing time from it:

for t in tasks:
    model.add_exactly_one([mode[t, k] for k in modes])
    model.add(
        proc_time[t] == sum(processing_time[product[t], k] * mode[t, k] for k in modes)
    )

Example: example_10_people_mode.py.

Headcount tracking

If the per-task resource depends on whether the task overlaps a break (or some other condition), plain add_cumulative may be insufficient. Build an explicit per-timestep resource variable and link it to task-start presence booleans. Three methods are compared in example_34_headcount_tracking.py.

Multi-stage jobs

A job can consist of several stages that must run in order. Each stage is a task with its own start/end; the job start is the earliest task start and the job end is the latest task end.

Job - stage - task structure

tasks = {(job, stage) for job in jobs for stage in stages}

for job in jobs:
    model.add_min_equality(job_start[job], [start[job, s] for s in stages])
    model.add_max_equality(job_end[job],   [end  [job, s] for s in stages])

    # stage precedence
    for s in sorted(stages)[:-1]:
        model.add(end[job, s] <= start[job, s + 1])

Example: example_21_stages_one_job.py.

Stage-level no-overlap

If each stage has a single shared machine, forbid two jobs from sitting on the same stage simultaneously:

for s in stages:
    model.add_no_overlap([intervals[job, s] for job in jobs])

Examples: example_22_stages_two_jobs.py, example_23_multistage_two_jobs_co.py.

Campaigning

A campaign is a run of same-product tasks on a machine between two changeovers. Typical rules:

  • tasks within a campaign are the same product and pay no changeover cost,
  • a campaign has a maximum size (e.g. at most N tasks),
  • switching products or hitting the cap triggers a changeover.

Approach 1: campaigns as entities

Create a set of potential campaigns, each with start/end/duration/presence, and variables linking tasks to campaigns. Sequence campaigns (not tasks) using add_circuit. The campaign-level changeover cost sits in the gap between campaigns.

Pros: close to the business view. Cons: more variables, scales worse.

Example: example_09_max_number_of_continuous_tasks.py.

Approach 2: cumulative rank per task

Keep tasks as the atomic unit and attach a rank variable cumul[t] in [0, campaign_size - 1]. On each t1 -> t2 arc:

  • if the campaign continues, cumul[t2] = cumul[t1] + 1,
  • if a changeover happens, cumul[t2] = 0 and end[t1] + changeover <= start[t2].

A reach_max[t] boolean fires when cumul[t] == campaign_size - 1, forcing a reset and changeover. add_max_equality(max_value, [0, cumul[t1] + 1 - reach_end[t1] * campaign_size]) is a useful trick to compute the next rank under an only_enforce_if.

Pros: fewer variables, scales better. Cons: trickier to explain.

Examples: example_24_campaigning_with_cumul.py (base), example_27_campaigning_products.py (multi-product), example_28_campaigning_products_machines.py (multi-machine).

Locking the task order

When tasks have deadlines that align with their index, locking start[t-1] <= start[t] (or the stricter end[t-1] <= start[t]) is a cheap heuristic that often gives a 10x+ solve-time improvement. See example_25_campaigning_with_locked_seq.py and the two example_26_campaigning_locked_seq_improved*.py variants.

Flexible campaign ends

If the model should be free to end a campaign early (not just at the cap), drop the "force reach_end when cumul hits max" implication and let the solver choose. This usually gives better objective values at a small solve- time cost. See the two example_26_*_improved*.py files for the comparison.

Solver techniques

Beyond modeling, CP-SAT exposes a few knobs that help on hard instances.

Decision strategies

Tell the solver which variables to branch on first, and which value to try first. Often needed when a symmetric model returns a "correct but ugly" schedule.

model.add_decision_strategy(
    starts.values(),
    cp_model.CHOOSE_FIRST,
    cp_model.SELECT_MIN_VALUE,
)

Example: example_07_break_without_changeover.py applies two strategies (one on starts, one on sequence literals) to get a canonical output.

Parallel workers

solver.parameters.num_search_workers = 8

Uses N worker threads. Examples example_03_seq_scale*.py benchmark the resulting speedup.

Warm-starting with hints

You can seed the search with values for any subset of variables:

model.proto().solution_hint.vars.append(var_index)
model.proto().solution_hint.values.append(value)

Or clear and re-set them between solves:

model.clear_hints()
add_hints(model, previous_solution)

This is the basis of phase solving: build the full model once, then run it repeatedly with an increasing max_time, feeding each phase's solution in as hints to the next. Example: example_32_solving_by_phases.py.

Reading back values

status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
    print(solver.value(var))
    print(solver.objective_value())

MODEL_INVALID almost always means a constraint references a variable that was never bound to the right model instance, or an only_enforce_if was attached to something that is not a literal.

Examples overview

Each example in this section corresponds to one Python file under scheduling/. Chapters are kept short: a brief description, the concepts it demonstrates (linked back to the Concepts section), and the source file inlined at the bottom.

Examples are grouped by topic in the sidebar:

  • Basics - CP-SAT primitives and small modeling tricks.
  • Sequencing - ordering tasks on one or more machines.
  • Changeover and intervals - different ways to model switches between products and the move from manual durations to interval variables.
  • Breaks - unavailable time windows, including breaks that extend a task's duration and automatic jobs that only need an operator for setup.
  • Shifts - preventing tasks from crossing shift boundaries.
  • Multi-stage jobs - jobs with ordered stages and per-stage capacity.
  • Resources - flexible resource/headcount modes and time-varying demand tracking.
  • Campaigning - grouping same-product tasks between changeovers, with multiple modelling approaches.
  • Solver techniques - warm-starting CP-SAT across phases with hints.

A few files (example_11, example_18, example_19, example_30) are empty placeholders kept for numbering; their chapters only note what they would have covered.

Unit tests

Source: scheduling/example_00_unit_tests.py

What it does

A scratchpad of CP-SAT primitives. Not a scheduling model. It runs through small self-contained models that each exercise one feature:

  • add_bool_or, add_bool_and, add_bool_xor over two booleans.
  • Plain linear constraints with Minimize.
  • Reifying "x is between 5 and 10" with chained only_enforce_if, with add_multiplication_equality, and with add_linear_expression_in_domain.
  • Reading back results with solver.Value.

Concepts

Notes

Useful as a cheat sheet when you want to remember how to express, for example, "b = (5 <= x <= 10)". Several of the snippets are commented out alternatives, kept for comparison.

Source

from ortools.sat.python import cp_model
model = cp_model.CpModel()
def get(x):
    return solver.value(x)

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_or(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_and(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_xor(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))


#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 2)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 1)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 0)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))


#
model = cp_model.CpModel()
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x==1).only_enforce_if(~x)
#model.add(x==0).only_enforce_if(~x)
# model.add(x==1).only_enforce_if(x)
model.add(x==0).only_enforce_if(x)
#model.add(y==1).only_enforce_if(x)
model.minimize(x)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x))


#

model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))









model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x_is_no_less_than_5 = model.new_bool_var('x_is_no_less_than_5')
x_is_no_more_than_10 = model.new_bool_var('x_is_no_more_than_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)

model.add(x_is_no_less_than_5 == x >= 5)


# model.add(x_is_no_less_than_5 == 1).only_enforce_if(x>=5)
# model.add(x_is_no_more_than_10 == 1).only_enforce_if(x <= 10)

model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))





##########################################

from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_greater_than_5 = model.new_bool_var('x_is_greater_than_5')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x >= 5).only_enforce_if(x_is_greater_than_5)
model.add(x < 5).only_enforce_if(~x_is_greater_than_5)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_greater_than_5))



from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
# This gives invalid
model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))




from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
#model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))




from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)

model.add(x < 5).only_enforce_if(~x_is_between_5_and_10)
model.add(x >10).only_enforce_if(~x_is_between_5_and_10)

model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))





from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')


model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)

model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)

model.add(x_is_between_5_and_10==x_greater_than_5*x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, )

model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))









from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x = model.new_int_var(0, 100, 'x')

model.add_linear_constraint(x, 5, 10).only_enforce_if(x_is_between_5_and_10)
model.add_linear_expression_in_domain(
    x,
    cp_model.Domain.from_intervals([[0, 4], [11, 100]])
).only_enforce_if(~x_is_between_5_and_10)

model.add(x == 3)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))





from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')

model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)

model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)

model.add_multiplication_equality(x_is_between_5_and_10, [x_greater_than_5, x_less_than_10])

model.add(x_is_between_5_and_10 == 1)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))

Events overlapping

Source: scheduling/example_15_events_overlapping.py

What it does

Tutorial on detecting whether two intervals overlap, and by how much.

  • var_start_earlier_than_start[t1, t2] = "t1 starts before t2".
  • var_end_later_than_start[t1, t2] = "t1 ends after t2 starts".
  • var_overlap[t1, t2] is the product of the two, i.e. the AND.
  • var_overlap_duration[t1, t2] equals end[t1] - start[t2] when there is overlap, else zero.

Both tasks here have fixed start/end for illustration; the interest is in how the overlap indicator is built.

Concepts

Source

from ortools.sat.python import cp_model

model = cp_model.CpModel()
max_time = 10
tasks = {1, 2}

# 1. Data

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

# overlap
# model.add(var_task_starts[1] == 1)
# model.add(var_task_ends[1] == 5)
# model.add(var_task_starts[2] == 3)
# model.add(var_task_ends[2] == 7)

# No overlap
model.add(var_task_starts[1] == 1)
model.add(var_task_ends[1] == 3)
model.add(var_task_starts[2] == 5)
model.add(var_task_ends[2] == 7)



var_overlap = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_overlap_duration = {
    (t1, t2): model.new_int_var(0, max_time, '')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_start_earlier_than_start = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_end_later_than_start = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

for t1 in tasks:
    for t2 in tasks:
        if t1 == t2:
            continue
        model.add(var_task_starts[t1] <= var_task_starts[t2]).only_enforce_if(var_start_earlier_than_start[t1, t2])
        model.add(var_task_starts[t1] > var_task_starts[t2]).only_enforce_if(~var_start_earlier_than_start[t1, t2])

        model.add(var_task_ends[t1] > var_task_starts[t2]).only_enforce_if(var_end_later_than_start[t1, t2])
        model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(~var_end_later_than_start[t1, t2])

        model.add_multiplication_equality(
            var_overlap[t1, t2],
            [var_start_earlier_than_start[t1, t2], var_end_later_than_start[t1, t2]]
        )

        model.add(var_overlap_duration[t1, t2] == var_task_ends[t1] - var_task_starts[t2]).only_enforce_if(var_overlap[t1, t2])
        model.add(var_overlap_duration[t1, t2] == 0).only_enforce_if(~var_overlap[t1, t2])




# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    count = 0
    for t1 in tasks:
        for t2 in tasks:
            if t1 == t2:
                continue
            if solver.value(var_overlap[t1,t2]):
                count = count + 1
                print(f'Task{t1} starts earlier and overlap Task{t2} for a duration of '
                      f'{solver.value(var_overlap_duration[t1, t2])} units')
    if count == 0:
        print("No overlapped tasks at all")

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Simple sequence

Source: scheduling/example_01_simple_sequence.py

What it does

The smallest end-to-end scheduling model in the book. Three tasks across two products on a single machine.

  • A dummy task 0 acts as the start/end node.
  • seq[t1, t2] booleans select the order via add_circuit.
  • If t1 -> t2 is chosen, end[t1] <= start[t2] is enforced.
  • The changeover cost lives in the objective: Minimize(sum(seq * cost)).
  • Duration is encoded manually with end - start == duration.

Concepts

Source

# circuit arcs
# tuples
# add arc : arcs.append([job1_index, job2_index, binary])                             |
# add link: Use only_enforce_if for  binary <--->  job1.start_time <= job2.end_time     | -> three things connected

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

M = 99999

# 1. Data
'''
task   product
1       A
2       B
3       A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}

m = {
    (t1, t2)
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or task_to_product[t1] == 'dummy' or task_to_product[t2] == 'dummy'
    else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}


# 2. Decision variables
'''
(1, 2): 0/1
(1, 3)
(2, 1)
(2, 3)
(3, 1)
(3, 2)
'''

max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_sequence = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for (t1, t2) in m
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)

model.minimize(total_changeover_time)
#model.maximize(total_changeover_time)


# 4. Constraints

# Add duration

for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

arcs = list()
'''
arcs.append([0, 1, model.new_bool_var("dummy0" + "_to_1")])
arcs.append([0, 2, model.new_bool_var("dummy0" + "_to_2")])
arcs.append([0, 3, model.new_bool_var("dummy0" + "_to_3")])
arcs.append([1, 0, model.new_bool_var("1_to_" + "dummy0")])
arcs.append([2, 0, model.new_bool_var("2_to_" + "dummy0")])
arcs.append([3, 0, model.new_bool_var("3_to_" + "dummy0")])
'''
for (from_task, to_task) in m:
    arcs.append(
        [
            from_task,
            to_task,
            variables_sequence[(from_task, to_task)]
        ]
    )

    if from_task != 0 and to_task != 0:
        model.add(
            variables_task_ends[from_task] <= variables_task_starts[to_task]
        ).only_enforce_if(variables_sequence[(from_task, to_task)])

model.add_circuit(arcs)

#model.add(variables_task_starts[0] == 8)

# Solve

# https://github.com/d-krupke/cpsat-primer
#model.add_decision_strategy(variables_task_starts, cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)


solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    for (t1, t2) in m:
        print(f'{t1} --> {t2}:   {solver.value(variables_sequence[(t1, t2)])}')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Sequence with locked starting product

Source: scheduling/example_02_seq_lock_starting_product.py

What it does

Same single-machine model as 01, but the dummy task is given a fixed "starting product". The changeover table is adjusted so that the arc from dummy to a task costs zero only when that task's product matches the starting product.

m_cost = {
    (t1, t2): 0 if task_to_product[t1] == task_to_product[t2]
                 or (task_to_product[t1] == 'dummy'
                     and task_to_product[t2] == starting_product)
              else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

M = 99999

# 1. Data
'''
task   product
1       A
2       B
3       A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
starting_product = 'A'

m = {
    (t1, t2)
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

# A -> A, B --> B: 0
# dummy A -> A: 0
# dummy A -> B: 1
# A -> B, B -> A: 1

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == starting_product
    )
    else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}








# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_sequence = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for (t1, t2) in m
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)

model.minimize(total_changeover_time)


# 4. Constraints

# Add duration

for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

arcs = list()

for (from_task, to_task) in m:
    arcs.append(
        [
            from_task,
            to_task,
            variables_sequence[(from_task, to_task)]
        ]
    )

    if from_task != 0 and to_task != 0:
        model.add(
            variables_task_ends[from_task] <= variables_task_starts[to_task]
        ).only_enforce_if(variables_sequence[(from_task, to_task)])

model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    for (t1, t2) in m:
        print(f'{t1} --> {t2}:   {solver.value(variables_sequence[(t1, t2)])}')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Multi-station sequence

Source: scheduling/example_03_seq_multi_stations.py

What it does

Extends the single-machine model of 02 to multiple machines.

  • presence[m, t] booleans, with add_exactly_one per task, select which machine runs each task.
  • Task-level start[t], end[t] are linked to start[m, t], end[m, t] under only_enforce_if(presence[m, t]).
  • One add_circuit per machine, with a self-loop [t, t, ~presence[m, t]] to skip tasks assigned elsewhere.
  • The objective is make_span + total_changeover_time, with add_max_equality computing the makespan.

Concepts

Source

"""
About changeover:
   - changeover cost is independently in objective function
   - The time index of tasks are not reflecting changeover events

function s is to see a sorted dict indexed by tuples

task 0 is only used in arcs

duration constraint is by end - start = duration
In Matthieu approach, no such constraint but there is new_optional_interval_var constraints that did this ?

We need task level start and end for convenience of working with more complicated constraints
"""


from ortools.sat.python import cp_model

# Initiate
M = 99999
model = cp_model.CpModel()


def s(x):
    sorted_keys = sorted(x)
    for key in sorted_keys:
        print(f"{key}, {x[key]}")

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}

# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}
variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}
variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)


# 4. Constraints

for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)

    # task level link to machine-task level
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        # The changeover consideration is done here by Mattheiu's approach


# This can be replaced by inverval variable ?
for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

for machine in machines:

    arcs = list()

    tmp = [x for x in X if x[0] == machine]

    for (m, from_task, to_task) in tmp:
        arcs.append(
            [
                from_task,
                to_task,
                variables_machine_task_sequence[(m, from_task, to_task)]
            ]
        )

        if from_task != 0 and to_task != 0:
            model.add(
                variables_task_ends[from_task] <= variables_task_starts[to_task]
            ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(machine, task)]
        ])

    model.add_circuit(arcs)





# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print(solver.value(make_span))

    for (m, t1, t2) in sorted(X):
        value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
        if value == 1:
            print(f'Machine {m}: {t1} --> {t2}  ')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Multi-station scale benchmark

Source: scheduling/example_03_seq_scale.py

What it does

Benchmark harness of the multi-machine model from 03a. The model is wrapped in a model(num_tasks) function and solved for num_tasks in [2, 3, ..., 12], recording wall-clock solve time and plotting with matplotlib.

  • Uses solver.parameters.num_search_workers = 8.
  • Still uses the manual end - start == duration constraint (no intervals yet).
  • Objective is make_span only; changeover line is commented out.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt


def generate_data(num_tasks):
    tasks = {i+1 for i in range(num_tasks)}
    tasks_0 = tasks.union({0})
    task_to_product = {0: 'dummy'}
    task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
    task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
    return tasks, tasks_0, task_to_product


def model(num_tasks):

    model = cp_model.CpModel()

    # 1. Data
    tasks, tasks_0, task_to_product = generate_data(num_tasks)
    max_time = num_tasks
    processing_time = {'dummy': 0, 'A': 1, 'B': 1}
    changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
    machines = {0, 1}
    machines_starting_products = {0: 'A', 1: 'A'}

    X = {
        (m, t1, t2)
        for t1 in tasks_0
        for t2 in tasks_0
        for m in machines
        if t1 != t2
    }

    m_cost = {
        (m, t1, t2): 0
        if task_to_product[t1] == task_to_product[t2] or (
                task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
        )
        else changeover_time[task_to_product[t2]]
        for (m, t1, t2) in X
    }

    # 2. Decision variables
    variables_task_ends = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_task_starts = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_machine_task_starts = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_ends = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_presences = {
        (m, t): model.new_bool_var(f"presence_{m}_{t}")
        for t in tasks
        for m in machines
    }

    variables_machine_task_sequence = {
        (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
        for (m, t1, t2) in X
    }

    # 3. Objectives

    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

    total_changeover_time = sum(
        [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
    )

    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [variables_task_ends[task] for task in tasks]
    )
    model.minimize(make_span)# + total_changeover_time)

    # 4. Constraints
    for task in tasks:
        task_candidate_machines = machines
        # find the subset in presence matrix related to this task
        tmp = [
            variables_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]

        # this task is only present in one machine
        model.add_exactly_one(tmp)

        # task level link to machine-task level
        for m in task_candidate_machines:
            model.add(
                variables_task_starts[task] == variables_machine_task_starts[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

            model.add(
                variables_task_ends[task] == variables_machine_task_ends[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

    for task in tasks:
        model.add(
            variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
        )

    # add_circuits
    for machine in machines:
        arcs = list()
        tmp = [x for x in X if x[0] == machine]
        for (m, from_task, to_task) in tmp:
            arcs.append(
                [
                    from_task,
                    to_task,
                    variables_machine_task_sequence[(m, from_task, to_task)]
                ]
            )
            if from_task != 0 and to_task != 0:
                model.add(
                    variables_task_ends[from_task] <= variables_task_starts[to_task]
                ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
        for task in tasks:
            arcs.append([
                task, task, ~variables_machine_task_presences[(machine, task)]
            ])
        model.add_circuit(arcs)

    # Solve
    solver = cp_model.CpSolver()
    solver.parameters.num_search_workers = 8
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    return total_time


if __name__ == '__main__':

    num_tasks = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

    seconds = []

    for i in num_tasks:
        print(i)
        processing_time = model(i)
        seconds.append(processing_time)

    plt.plot(num_tasks, seconds)
    plt.show()

Multi-station scale (intervals)

Source: scheduling/example_03_seq_scale_Mathieu.py

What it does

Same benchmark as 03b, but switches to new_optional_interval_var plus add_no_overlap per machine. A helper add_circuit_constraints(...) factors out the per-machine circuit.

The interval-based model scales much better: the main loop here runs num_tasks up to ~80 on the same hardware that 03b handled only up to 12.

Concepts

Source

# USE INTERVAL !!!

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt


def generate_data(num_tasks):
    tasks = {i+1 for i in range(num_tasks)}
    tasks_0 = tasks.union({0})
    task_to_product = {0: 'dummy'}
    task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
    task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
    return tasks, tasks_0, task_to_product


def model(num_tasks):

    model = cp_model.CpModel()

    # 1. Data
    tasks, tasks_0, task_to_product = generate_data(num_tasks)
    max_time = num_tasks
    processing_time = {'dummy': 0, 'A': 1, 'B': 1}
    changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
    machines = {0, 1}
    machines_starting_products = {0: 'A', 1: 'A'}

    X = {
        (m, t1, t2)
        for t1 in tasks_0
        for t2 in tasks_0
        for m in machines
        if t1 != t2
    }

    m_cost = {
        (m, t1, t2): 0
        if task_to_product[t1] == task_to_product[t2] or (
                task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
        )
        else changeover_time[task_to_product[t2]]
        for (m, t1, t2) in X
    }

    # 2. Decision variables
    variables_task_ends = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_task_starts = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_machine_task_starts = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_ends = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_presences = {
        (m, t): model.new_bool_var(f"presence_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_intervals = {
        (m, t): model.new_optional_interval_var(
            start=variables_machine_task_starts[m, t],
            size=processing_time[task_to_product[t]],
            end=variables_machine_task_ends[m, t],
            is_present=variables_machine_task_presences[m, t],
            name=f"interval_{t}_{m}",
        )
        for t in tasks
        for m in machines
    }

    variables_machine_task_sequence = {
        (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
        for (m, t1, t2) in X
    }


    # 3. Objectives

    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

    total_changeover_time = sum(
        [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
    )

    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [variables_task_ends[task] for task in tasks]
    )
    model.minimize(make_span + total_changeover_time)

    # 4. Constraints
    for task in tasks:
        task_candidate_machines = machines
        # find the subset in presence matrix related to this task
        tmp = [
            variables_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]

        # this task is only present in one machine
        model.add_exactly_one(tmp)

        # task level link to machine-task level
        for m in task_candidate_machines:
            model.add(
                variables_task_starts[task] == variables_machine_task_starts[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

            model.add(
                variables_task_ends[task] == variables_machine_task_ends[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

    for machine in machines:
        tmp = {(m, t) for (m, t) in variables_intervals if m == machine}
        intervals = [variables_intervals[x] for x in tmp]
        model.add_no_overlap(intervals)

    # Create circuit constraints
    add_circuit_constraints(
        model,
        machines,
        tasks,
        variables_task_starts,
        variables_task_ends,
        variables_machine_task_presences,
    )

    # Solve
    solver = cp_model.CpSolver()
    solver.parameters.num_search_workers = 8
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    return total_time


def add_circuit_constraints(
    model,
    machines,
    tasks,
    variables_task_starts,
    variables_task_ends,
    variables_machine_task_presences,
):
    for machine in machines:
        arcs = (
            []
        )  # List of all feasible arcs within a machine. Arcs are boolean to specify circuit from node to node
        machine_tasks = tasks

        for node_1, task_1 in enumerate(machine_tasks):
            mt_1 = str(task_1) + "_" + str(machine)
            # Initial arc from the dummy node (0) to a task.
            arcs.append(
                [0, node_1 + 1, model.new_bool_var("first" + "_" + mt_1)]
            )  # if mt_1 follows dummy node 0
            # Final arc from an arc to the dummy node (0).
            arcs.append(
                [node_1 + 1, 0, model.new_bool_var("last" + "_" + mt_1)]
            )  # if dummy node 0 follows mt_1

            # For optional task on machine (i.e other machine choice)
            # Self-looping arc on the node that corresponds to this arc.
            arcs.append(
                [
                    node_1 + 1,
                    node_1 + 1,
                    ~variables_machine_task_presences[(machine, task_1)],
                ]
            )

            for node_2, task_2 in enumerate(machine_tasks):
                if node_1 == node_2:
                    continue
                mt_2 = str(task_2) + "_" + str(machine)
                # Add sequential boolean constraint: mt_2 follows mt_1
                mt2_after_mt1 = model.new_bool_var(f"{mt_2} follows {mt_1}")
                arcs.append([node_1 + 1, node_2 + 1, mt2_after_mt1])

                # We add the reified precedence to link the literal with the
                # times of the two tasks.
                min_distance = 0
                (
                    model.add(
                        variables_task_starts[task_2] >= variables_task_ends[task_1] + min_distance
                    ).only_enforce_if(mt2_after_mt1)
                )
        model.add_circuit(arcs)



if __name__ == '__main__':

    num_tasks = [x+2 for x in range(80)]

    seconds = []

    for i in num_tasks:
        print(i)
        processing_time = model(i)
        seconds.append(processing_time)

    plt.plot(num_tasks, seconds)
    plt.show()

Changeover in constraint

Source: scheduling/example_04_seq_with_changeover_in_constraint.py

What it does

Moves the changeover out of the objective and into the precedence constraint. If seq[m, t1, t2] is true, then

model.add(end[t1] + distance <= start[t2]).only_enforce_if(seq[m, t1, t2])

where distance is the changeover time between the two products. The objective becomes just the makespan.

Now schedule and objective agree: a changeover physically pushes the next task later, it is not only charged abstractly.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

'''
M1  A -> A -> A   1+1
M2  A -> B -> B   1+1+1  --> 3
'''

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
for task in tasks_0:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Sequence with intervals

Source: scheduling/example_05_seq_with_intervals.py

What it does

Same setup as 04, but replaces the manual duration constraint end - start == duration with new_optional_interval_var:

variables_machine_task_intervals[m, t] = model.new_optional_interval_var(
    start[m, t], processing_time[product_of(t)], end[m, t],
    presence[m, t], name=f"interval_{m}_{t}",
)

The changeover is still encoded as end[t1] + distance <= start[t2] under only_enforce_if(seq[m, t1, t2]).

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Sequence with shared resource

Source: scheduling/example_06_seq_with_intervals_resource.py

What it does

Extends 05 with a globally shared resource (a single operator). After building all machine-task intervals, the model adds a single add_cumulative over them:

intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1] * len(intervals), 1)

With capacity 1, the two machines can no longer run tasks in parallel: the operator has to pick one at a time.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'B'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}




# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Add resource constraint that there is only one people so no parallel task expected

intervals = list(variables_machine_task_intervals.values())

model.add_cumulative(intervals, [1]*len(intervals), 1)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Changeover as event

Source: scheduling/example_08_changeover_as_event.py

What it does

Promotes the changeover to a first-class scheduled event. For every ordered pair (t1, t2) there is an optional interval with its own start, end, and presence:

co_iv[m, t1, t2] = model.new_optional_interval_var(
    co_start[m, t1, t2],
    changeover_time[product_of(t2)],
    co_end[m, t1, t2],
    co_present[m, t1, t2],
    ...,
)

When seq[m, t1, t2] is chosen, the model forces the co interval to be present, sit between the two tasks, and have the right size:

model.add(end[t1] <= co_start[t1, t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end[t1, t2] <= start[t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end - co_start == distance).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 1).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 0).only_enforce_if(~seq[m, t1, t2])

Because the changeover is now an interval, it can take part in cumulative constraints (cleaning resource, operator availability, etc.).

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
'''

# 1. Data

tasks = {1, 2}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 2, 'B': 2}
machines = {0}
machines_starting_products = {0: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# This is not yet used
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
# ! This can replace the end - start = duration constrain
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"t_interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# Add change over-related variables !!!

variables_co_starts = {
    (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_co_ends = {
    (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_machine_co_starts = {
    (m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_start")
    for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_ends = {
    (m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_end")
    for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_presences = {
    (m, t1, t2): model.new_bool_var(f"co_presence_m{m}_t{t1}_t{t2}")
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}


variables_machine_co_intervals = {
    (m, t1, t2): model.new_optional_interval_var(
        variables_machine_co_starts[m, t1, t2],
        changeover_time[task_to_product[t2]],
        variables_machine_co_ends[m, t1, t2],
        variables_machine_co_presences[m, t1, t2],
        name=f"co_interval_m{m}_t{t1}_t{t2}"
    )
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}







# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# One task to one machine.
for task in tasks:
    task_candidate_machines = machines
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# co level link to machine-co level
for t1 in tasks_0:
    for t2 in tasks_0:
        if t1 != t2:

            for m in machines:
                model.add(
                    variables_co_starts[t1, t2] == variables_machine_co_starts[m, t1, t2]
                ).only_enforce_if(variables_machine_co_presences[m, t1, t2])

                model.add(
                    variables_co_ends[t1, t2] == variables_machine_co_ends[m, t1, t2]
                ).only_enforce_if(variables_machine_co_presences[m, t1, t2])


# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:
    arcs = list()
    for t1 in tasks_0:
        for t2 in tasks_0:
            # arcs
            if t1 != t2:
                arcs.append([
                        t1,
                        t2,
                        variables_machine_task_sequence[(m, t1, t2)]
                ])
                distance = m_cost[m, t1, t2]
                # cannot require the time index of task 0 to represent the first and the last position
                if t2 != 0:
                    # to schedule tasks and c/o
                    model.add(
                        variables_task_ends[t1] <= variables_co_starts[t1, t2]
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_co_ends[t1, t2] <= variables_task_starts[t2]
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    # ensure intervals are consistent so we can apply resource constraints later
                    model.add(
                        variables_machine_co_presences[m, t1, t2] == 1
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_machine_co_presences[m, t1, t2] == 0
                    ).only_enforce_if(~variables_machine_task_sequence[(m, t1, t2)])


    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])
    model.add_circuit(arcs)


# Solve


solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                        print('variables_machine_task_sequence[t1, t2]', solver.value(variables_machine_task_sequence[m, t1, t2]))
                        print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
                        print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
                        print('variables_machine_co_presences[m, t1, t2]', solver.value(variables_machine_co_presences[m, t1, t2]))
                        print('variables_machine_co_starts[m, t1, t2]', solver.value(variables_machine_co_starts[m, t1, t2]))
                        print('variables_machine_co_ends[m, t1, t2]', solver.value(variables_machine_co_ends[m, t1, t2]))
                        #print('variables_machine_co_intervals[m, t1, t2]', variables_machine_co_intervals[m, t1, t2])

                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Break without changeover

Source: scheduling/example_07_break_without_changeover.py

What it does

Single machine, four same-product tasks, and a fixed break at (2, 3).

  • The break is a new_fixed_size_interval_var added to an add_cumulative( intervals, capacity=1) alongside task intervals, so tasks are pushed around it.
  • Since all tasks share one product, no changeover logic is needed.
  • Two add_decision_strategy calls (on starts and on sequence literals) are used to force the solver to produce the canonical 0 1 2 3 4 0 order instead of a symmetric alternative.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
3       A
4       A
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'A', 3: 'A', 4: 'A'}
processing_time = {'dummy': 0, 'A': 1}
changeover_time = {'dummy': 0, 'A': 1}
machines = {0}
machines_starting_products = {0: 'A'}
breaks = {(2, 3)}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# One task to one machine.
for task in tasks:
    # For this task
    # get all allowed machines
    task_candidate_machines = machines
    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:
    arcs = list()
    for from_task in tasks_0:
        for to_task in tasks_0:
            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])
                distance = m_cost[m, from_task, to_task]
                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:
                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])
    model.add_circuit(arcs)



# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=1, name='a_break') for (start, end) in breaks
}

# Add resource control with break
intervals = list(variables_machine_task_intervals.values()) + list(variables_breaks.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)



# Solve


# https://github.com/d-krupke/cpsat-primer

# default
# 0 1 4 2 3 0

#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
# 0 4 3 2 1 0

#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0

# strange
#model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0

# Need the following both to have the right sequence
model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)


# 0 1 2 3 4 0


solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

One automatic job

Source: scheduling/example_12_an_automatic_job.py

What it does

Models an "automatic" task: one that consumes the operator only for its first time unit (setup), after which the machine runs on its own.

  • var_task_intervals[t] is the full task interval (setup + auto run).

  • var_task_intervals_autojobs[t] is a size-1 interval at (start, start + 1), representing only the setup portion.

  • Breaks are added as fixed intervals. The cumulative constraint uses the setup intervals and the breaks:

    model.add_cumulative(
        intervals=setup_intervals + break_intervals,
        demands=[1] * (len(tasks) + len(breaks)),
        capacity=1,
    )
    

So breaks push task starts around but cannot preempt an already-running automatic task.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_3
'''

# 1. Data

tasks = {1}
products = {'A', 'B'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_3'}
processing_time = {'A': 3, 'B': 1}
max_time = 10
breaks = {(0, 1), (2, 10)}


# 2. Decision Variables
var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}

var_task_intervals_autojobs = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_3'
}



# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_autojobs.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1] + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Automatic jobs

Source: scheduling/example_13_automatic_jobs.py

What it does

Same automatic-task idea as 12, now with two tasks that need to be sequenced.

  • Every task has a full interval plus a size-1 "auto start" interval.
  • A circuit with seq[t1, t2] booleans orders the tasks; the selected arc enforces end[t1] <= start[t2].
  • The cumulative uses the auto-start intervals and the break intervals so that breaks only block task starts.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_3
2       B           TYPE_3
'''

# 1. Data

tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'B'}
task_to_type = {1: 'TYPE_3', 2: 'TYPE_3'}
processing_time = {'A': 4, 'B': 3}
max_time = 10
breaks = {(0, 1), (2, 3), (4, 6), (7, 10)}


# 2. Decision Variables

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}

var_task_intervals_auto = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_auto_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_3'
}

var_task_seq = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}


arcs = []
for t1 in tasks:
    tmp_1 = model.new_bool_var(f'first_to_{t1}')
    arcs.append([0, t1, tmp_1])

    tmp_2 = model.new_bool_var(f'{t1}_to_last')
    arcs.append([t1, 0, tmp_2])

    for t2 in tasks:
        if t1 == t2:
            continue

        tmp_3 = model.new_bool_var(f'{t1}_to_{t2}')
        arcs.append([t1, t2, var_task_seq[t1, t2]])

        model.add(
            var_task_ends[t1] <= var_task_starts[t2]
        ).only_enforce_if(var_task_seq[t1, t2])

model.add_circuit(arcs)


# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))
    print('=======================  ALLOCATION & SEQUENCE  =======================')
    if True:
        for t1 in tasks:
            for t2 in tasks:
                if t1 != t2:
                    value = solver.value(var_task_seq[(t1, t2)])
                    print(f'{t1} --> {t2}  {value}')
                    # if value == 1 and t2 != 0:
                    #     print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
                    # if value == 1 and t2 == 0:
                    #     print(f'{t1} --> {t2}   Closing')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Task delaying a break

Source: scheduling/example_14_task_delaying_break.py

What it does

When a task runs across a break, its total machine time grows by the length of the break. This is encoded per time slot:

  • var_task_duration_timeslots[t, i] = 1 iff task t occupies slot i. Built from two reified booleans, "starts before i" AND "ends after i", combined with add_multiplication_equality.
  • var_task_new_duration[t] = base + sum(is_break[i] * uses[t, i] for i) computes the stretched duration.
  • The task interval uses this stretched duration directly.
var_task_intervals[t] = model.new_interval_var(
    start[t], new_duration[t], end[t], ...,
)

Break intervals are added to add_cumulative alongside the task start intervals (since breaks do not preempt an auto-type task once started).

Concepts

  • Breaks (approach 2: duration stretched)
  • CP-SAT basics (reification, add_multiplication_equality)

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_4
2       B           TYPE_4
'''

# 1. Data

tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}

# 2. Decision Variables

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_new_duration = {
    task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}

var_task_duration_timeslots = {
    (task, i): model.new_bool_var(f'task {task} uses interval {i}')
    for task in tasks
    for i in range(max_time)
}

## four
# AND pair
var_task_start_earlier_than_start = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
var_task_end_later_than_end = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }


for task in tasks:
    for i in range(max_time):
        model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
        model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])

        model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
        model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])

        # model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
        # model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
        #
        # model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
        # model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])

        # And
        model.add_multiplication_equality(
            var_task_duration_timeslots[task, i],
            [var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
        )

        # model.add_min_equality(
        #     var_task_duration_timeslots[task, i],
        #     [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
        # )

# for task in tasks:
#     for i in range(max_time):
#         start_index = i
#         end_index = i + 1
#
#         # TRUE
#         model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
#         model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
#         # FALSE
#         if i == 0:
#             # first time slot
#             model.add_linear_expression_in_domain(
#             var_task_starts[task],
#             cp_model.Domain.from_intervals([[1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#
#         elif i == max_time - 1:
#             # last time slot
#             model.add_linear_expression_in_domain(
#             var_task_ends[task],
#             cp_model.Domain.from_intervals([[0, max_time-1]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#         else:
#             model.add_linear_expression_in_domain(
#             x,
#             cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])


#
# task_1_start  <  task_2_start <  task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:


for task in tasks:
    model.add(
            var_task_new_duration[task] == processing_time[task_to_product[task]] +
            sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
    )


var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        var_task_new_duration[task],
        #processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}


var_task_intervals_auto = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_auto_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_4'
}

# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)




# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    for task in tasks:
        print(f'task {task}')
        for i in range(max_time):
            print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )
        print(solver.value(var_task_new_duration[task]))

    print('Make-span:', solver.value(make_span))
    # print('=======================  ALLOCATION & SEQUENCE  =======================')
    # if True:
    #     for t1 in tasks:
    #         for t2 in tasks:
    #             if t1 != t2:
    #                 value = solver.value(var_task_seq[(t1, t2)])
    #                 print(f'{t1} --> {t2}  {value}')
    #                 # if value == 1 and t2 != 0:
    #                 #     print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
    #                 # if value == 1 and t2 == 0:
    #                 #     print(f'{t1} --> {t2}   Closing')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Linear domain for breaks

Source: scheduling/example_29_linear_domain_for_breaks.py

What it does

Compares two ways of enforcing that tasks never overlap a periodic break:

  • Method 1 (run_model_1): model every break as a fixed interval and put them in add_cumulative (or add_no_overlap) along with tasks.
  • Method 2: keep tasks as intervals but restrict their start values to a domain that excludes break-overlapping starts, using add_linear_expression_in_domain.

The second method skips creating many break intervals and lets CP-SAT propagate directly on the start domain, which can be faster for highly periodic schedules. The file includes a benchmark loop.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from tqdm import tqdm
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2


# method 1: No overlapping
def run_model_1(number_of_tasks):
    #number_of_tasks = 200
    tasks = {x for x in range(number_of_tasks)}

    # 2 tasks --> 8 time units

    processing_time = 2
    max_time = number_of_tasks * 4 + 100

    breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}

    valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
    valid_start_times = [item for sublist in valid_start_times for item in sublist]

    model = cp_model.CpModel()

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            processing_time,
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] < var_task_starts[task])

    var_break_intervals = {
        break_id: model.new_fixed_size_interval_var(
            break_start, break_end-break_start, f"break_{break_id}"
        ) for break_id, (break_start, break_end) in enumerate(breaks)
    }

    all_intervals = [x for x in var_task_intervals.values()] + [x for x in var_break_intervals.values()]
    model.add_no_overlap(all_intervals)

    # obj
    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    #     for task in tasks:
    #         print(f'Task {task} ',
    #               solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
    #               )
    #
    #     print('Make-span:', solver.value(make_span))

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


# method 2: Use linear domain
def run_model_2(number_of_tasks):

    #number_of_tasks = 4
    tasks = {x for x in range(number_of_tasks)}

    # 2 tasks --> 8 time units

    processing_time = 2
    max_time = number_of_tasks * 4 + 100

    breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}

    valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
    valid_periods = [[0 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]

    valid_start_times = [item for sublist in valid_start_times for item in sublist]

    model = cp_model.CpModel()

    #model.new_int_varFromDomain(cp_model.Domain.from_intervals([[1, 2], [4, 6]]), 'x')

    var_task_starts = {task: model.new_int_varFromDomain(
        cp_model.Domain.from_intervals(valid_periods),
        f"task_{task}_start")
        for task in tasks}

    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            processing_time,
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] < var_task_starts[task])

    all_intervals = [x for x in var_task_intervals.values()]
    model.add_no_overlap(all_intervals)

    # obj
    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    #     for task in tasks:
    #         print(f'Task {task} ',
    #               solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
    #               )
    #
    #     print('Make-span:', solver.value(make_span))

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    N = 200

    sizes_1 = list(range(2, N, 10))
    sizes_2 = list(range(2, N, 10))

    model_1_times = []
    model_2_times = []

    print("\nNoOverlap\n")
    for size in tqdm(sizes_1):
        model_1_times.append(run_model_1(size))

    print("\nLinear Domain\n")
    for size in tqdm(sizes_2):
        model_2_times.append(run_model_2(size))

    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(sizes_1, model_1_times, marker='o', label='NoOverLapping')
    plt.plot(sizes_2, model_2_times, '-.', marker='o', label='Linear Domain')
    plt.legend()
    plt.title('Performance benchmarking')
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()

Conditional duration via linear domain

Source: scheduling/example_33_conditional_duration_linear_domain.py

What it does

A task's duration depends on whether its start falls in a break slot.

  • Two domains are built: domain_break (start values that overlap a break) and domain_no_break (safe start values).
  • A reified bool var_task_overlap_break[t] toggles which domain the start must belong to, via add_linear_expression_in_domain.
  • Under only_enforce_if, the task duration is either processing_time or processing_time + 1.
  • The interval is built from start/duration/end, so everything downstream (no-overlap, cumulative) sees the correct effective duration.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
import pandas as pd


if __name__ == '__main__':
    """
    Offset = 0:
        | x |   |   | x |   |   | x |   |   |
    Offset = 1:
        |   | x |   |   | x |   |   | x |   |    
    Offset = 2:
        |   |   | x |   |   | x |   |   | x |    
        
    where x represent a unit duration break period
    
    """
    break_offset = 1

    num_of_tasks = 3
    max_time = num_of_tasks*3
    processing_time = 2

    if break_offset == 0:
        help_text = "| x |   |   | x |   |   | x |   |   |"
    elif break_offset == 1:
        help_text = "|   | x |   |   | x |   |   | x |   |"
    elif break_offset == 2:
        help_text = "|   |   | x |   |   | x |   |   | x |"
    else:
        print("offset wrong")
        exit()

    breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
    tasks = {x for x in range(num_of_tasks)}
    starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
    starts_break = list(set(range(max_time)).difference(starts_no_break))
    domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
    domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])

    model = cp_model.CpModel()

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
    var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}

    # print("Heuristic 1: Apply the tasks sequence heuristics")
    for task in tasks:
        if task == 0:
            continue
        model.add(var_task_ends[task-1] <= var_task_starts[task])

    for task in tasks:

        model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
            var_task_overlap_break[task]
        )

        model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
            ~var_task_overlap_break[task]
        )

        model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
            var_task_overlap_break[task]
        )

        model.add(var_task_durations[task] == processing_time).only_enforce_if(
            ~var_task_overlap_break[task]
        )

    # intervals
    var_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            var_task_durations[task],
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }

    model.add_no_overlap(var_intervals.values())

    # Objectives
    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span + sum(var_task_durations.values()))
    # model.minimize(sum(var_task_durations))

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    print_result = True
    # show the result if getting the optimal one
    if print_result:
        print("-----------------------------------------")
        print(help_text)
        print("breaks periods:", breaks)
        print("break starts:", starts_break)
        print("no break starts:", starts_no_break)

        if status == cp_model.OPTIMAL:
            big_list = []
            for task in tasks:
                tmp = [
                    f"task {task}",
                    solver.value(var_task_starts[task]),
                    solver.value(var_task_ends[task]),
                    solver.value(var_task_overlap_break[task]),
                    solver.value(var_task_durations[task]),
                ]
                big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
            df = df.sort_values(['start'])
            print(df)
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

Shift crossing (fake time unit)

Source: scheduling/example_16_shift_crossing_fake_time_unit.py

What it does

Prevents tasks from straddling shift boundaries by inserting a tiny fake break at each boundary. Every task is required to not overlap those fake breaks.

var_shift_break_intervals[s, e] = model.new_fixed_size_interval_var(
    start=s, size=e - s, name="shift_edge",
)
for s, e in synthetic_shift_breaks:
    for t in tasks:
        model.add_no_overlap([var_task_intervals[t], var_shift_break_intervals[s, e]])

Real breaks are still modeled with add_cumulative as usual.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

# 1. Data

synthetic_shift_breaks = {(4, 5), (9, 10)}
breaks = {(0, 2)}
tasks = {1}
processing_time = {1: 3}
max_time = 10

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task], processing_time[task], var_task_ends[task], name=f"interval_t{task}"
    ) for task in tasks
}

# Add break time
var_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())

demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# THE CONSTRAINT for synthetic shift break time unit


var_shift_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in synthetic_shift_breaks
}

for start, end in synthetic_shift_breaks:
    print(start, end)
    for task in tasks:
        model.add_no_overlap([var_task_intervals[task], var_shift_break_intervals[start, end]])



# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Shift crossing (Mathieu)

Source: scheduling/example_17_shift_crossing_mathieu.py

What it does

Alternative to 16. Instead of synthetic breaks, every task is assigned to exactly one shift, and its start/end are constrained to the shift window:

for t in tasks:
    model.add_exactly_one(presence[s, t] for s in shifts)
    for s in shifts:
        model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
        model.add(end[t]   <= shift_end[s]  ).only_enforce_if(presence[s, t])

The rest of the model (real breaks, cumulative, makespan) is the same as before.

Concepts

  • Shifts (explicit shift assignment)

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

tasks = {1}
processing_times = {1: 3}
max_time = 10
breaks = {(0, 2)}
shifts = {1, 2}
shift_starts = {1:0, 2:4}
shift_ends = {1:4, 2:8}


var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_shift_task_presence = {
    (shift, task): model.new_bool_var(f'task_{task}_is_in_shift_{shift}')
    for shift in shifts for task in tasks
}

for task in tasks:
    model.add(sum(var_shift_task_presence[shift, task] for shift in shifts) == 1 )

    for shift in shifts:
        model.add(var_task_starts[task] >= shift_starts[shift]).only_enforce_if(
            var_shift_task_presence[shift, task]
        )
        model.add(var_task_ends[task] <= shift_ends[shift]).only_enforce_if(
            var_shift_task_presence[shift, task]
        )


var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task], processing_times[task], var_task_ends[task], name=f"interval_t{task}"
    ) for task in tasks
}

# Add break time
var_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}


intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())

demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Stages, one job

Source: scheduling/example_21_stages_one_job.py

What it does

A single job with three stages. Each stage is a task indexed by (job, stage).

  • var_job_starts/var_job_ends are the min/max of task starts/ends via add_min_equality / add_max_equality.
  • Stage precedence: end[job, s] <= start[job, s + 1].
  • Each stage has an interval, and no-overlap is enforced per stage (only one job at a time per stage).

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 10

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}


# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Stages, two jobs

Source: scheduling/example_22_stages_two_jobs.py

What it does

Extends 21 with a second job. The per-stage add_no_overlap now actually does work: with two jobs, stage s can only run one of them at a time.

Structure is otherwise identical to 21 (min/max for job start/end, stage precedence, make-span minimisation).

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1, 2}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 20

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}

for stage in stages:
    model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)

# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Multistage, two jobs

Source: scheduling/example_23_multistage_two_jobs_co.py

What it does

Scales the multi-stage model of 22 up to 6 jobs over 3 stages. The model is identical in shape: stage precedence, per-stage no-overlap, add_max_equality for makespan, Minimize(make_span).

Despite the filename mentioning co (changeover), this version does not actually add a changeover constraint; it is a scalability exercise.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1, 2, 3, 4, 5, 6}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 100

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}

for stage in stages:
    model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)

# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

People mode

Source: scheduling/example_10_people_mode.py

What it does

Each task can run in one of several "people modes", with different processing times and headcount requirements. Only the mode choice changes the model; sequencing is similar to 03.

  • variables_task_resource_mode[t, k]: one-hot bool for task t selecting mode k. Exactly one mode per task.

  • variables_task_processing_time[t] is derived from the mode:

    model.add(
        processing_time_var[t] == sum(
            processing_time[product[t], k] * mode[t, k] for k in modes
        )
    )
    
  • Standard machine-assignment variables (presence[m, t], start[m, t], end[m, t]), per-machine add_circuit, and task-level links complete the model.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
'''

### 1. Data

tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'A'}
machines = {1,2}
resource_modes = {1, 2}

# product -> people mode -> time duration
processing_time = {
    ('A', 1): 3,
    ('A', 2): 2
}
resource_requirement = {
    ('A', 1): 2,
    ('A', 2): 3
}


max_time = 20

# (task, people_mode)

# 2. Decision variables

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}

# One task to one machine.
for task in tasks:
    task_candidate_machines = machines
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)

# task level link to machine-task level
for task in tasks:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for sequence control
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for t1 in tasks
    for t2 in tasks
    for m in machines
    if t1 != t2
}

# Sequence
for m in machines:
    arcs = list()
    for node1, t1 in enumerate(tasks):
        tmp_1 = model.new_bool_var(f'm_{m}_first_to_{t1}')
        arcs.append([0, t1, tmp_1])

        tmp_2 = model.new_bool_var(f'm_{m}_{t1}_to_last')
        arcs.append([t1, 0, tmp_2])

        arcs.append([t1, t1, ~variables_machine_task_presences[m, t1]])

        for node_2, t2 in enumerate(tasks):
            # arcs
            if t1 == t2:
                continue

            arcs.append([
                t1,
                t2,
                variables_machine_task_sequence[(m, t1, t2)]
            ])

            distance = 0

            # cannot require the time index of task 0 to represent the first and the last position
            model.add(
                variables_task_ends[t1] + distance <= variables_task_starts[t2]
            ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

    model.add_circuit(arcs)

#
variables_task_resource_mode = {
    (task, resource_mode): model.new_bool_var(f"task {task} using resource mode {resource_mode}")
    for task in tasks for resource_mode in resource_modes
}

# variables_task_resource_modes = {
#     task: model.new_int_var(0, len(resource_modes), f"the resource mode of {task}")
#     for task in tasks
# }

# one task has to pick one resource mode
for task in tasks:
    tmp = sum(variables_task_resource_mode[task, resource_mode] for resource_mode in resource_modes)
    model.add(tmp == 1)

# link resource id with decision matrix
# for task in tasks:
#     for resource_mode in resource_modes:
#         model.add(
#             variables_task_resource_modes[task] == resource_mode
#         ).only_enforce_if(
#             variables_task_resource_mode_matrix[task, resource_mode]
#         )

variables_task_processing_time = {
    task: model.new_int_var(0, 99, f"processing time for {task}")
    for task in tasks
}

for task in tasks:
    model.add(
        variables_task_processing_time[task] == sum(
            processing_time[task_to_product[task], resource_mode]*variables_task_resource_mode[task, resource_mode]
            for resource_mode in resource_modes
        )
    )

# model.new_optional_interval_var(1,1,2,1,'')
# model.new_optional_interval_var(
#     model.new_int_var(0,1,''),
#     model.new_int_var(0, 1, ''),
#     model.new_int_var(0,1,''),
#     model.new_int_var(0, 1, ''),
#     '')

# variables_machine_task_intervals = {
#     (m, task): model.new_optional_interval_var(
#         variables_machine_task_starts[m, task],
#         #2,
#         variables_task_processing_time[task],
#         #processing_time[task_to_product[task], variables_task_resource_modes[task]],
#         variables_machine_task_ends[m, task],
#         variables_machine_task_presences[m, task],
#         name=f"t_interval_{m}_{task}"
#     )
#     for task in tasks
#     for m in machines
# }

variables_machine_task_resource_mode_presence = {
    (m, task, resource_mode): model.new_bool_var(f'm{m}_task_{task}_resource_mode_{resource_mode}_presence')
    for m in machines
    for task in tasks
    for resource_mode in resource_modes
}

for task in tasks:
    for resource_mode in resource_modes:
        model.add(
            sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for m in machines) == 1
        ).only_enforce_if(
            variables_task_resource_mode[task, resource_mode]
        )

for task in tasks:
    for m in machines:
        model.add(
            sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for resource_mode in resource_modes) == 1
        ).only_enforce_if(
            variables_machine_task_presences[m, task]
        )

for task in tasks:
    model.add(
        sum(variables_machine_task_resource_mode_presence[m, task, resource_mode]
            for resource_mode in resource_modes
            for m in machines
            ) == 1
    )


variables_machine_task_mode_intervals = {
    (m, task, resource_mode): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        #2,
        variables_task_processing_time[task],
        #processing_time[task_to_product[task], variables_task_resource_modes[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_resource_mode_presence[m, task, resource_mode],
        #variables_machine_task_presences[m, task]*variables_task_resource_mode[task, resource_mode],
        name=f"int_m{m}_t{task}_mode{resource_mode}"
    )
    for task in tasks
    for m in machines
    for resource_mode in resource_modes
}




intervals = [x for x in variables_machine_task_mode_intervals.values()]
demands = [
    resource_requirement[task_to_product[task], resource_mode]
    for machine, task, resource_mode in variables_machine_task_mode_intervals.keys()
]
model.add_cumulative(intervals=intervals, demands=demands, capacity=7)

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# for m in machines:
#     for task in tasks:
#         for resource_mode in resource_modes:
#             print(f'{m} {task} {resource_mode}')
#             print(solver.value(variables_machine_task_mode_intervals[m, task, resource_mode]))

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    for m in machines:
        for task in tasks:
            for resource_mode in resource_modes:
                print(f'M {m} T {task} R {resource_mode}' ,end = ' ')
                print(f'  {solver.value(variables_machine_task_resource_mode_presence[m, task, resource_mode])}')


    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task]), end='   '
              )
        for resource_mode in resource_modes:
            value = solver.value(variables_task_resource_mode[task, resource_mode])
            if value == 1:
                print(f'Using resource mode {resource_mode}')
    print('Make-span:', solver.value(make_span))
    print('=======================  ALLOCATION & SEQUENCE  =======================')
    for m in machines:

        print(f'------------\nMachine {m}')
        for t1 in tasks:
            value = solver.value(variables_machine_task_presences[(m, t1)])
            if value == 1:
                print(f'task_{t1}_on machine_{m}')
            for t2 in tasks:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Resource mode (empty)

Source: scheduling/example_11_resource_mode.py

The file is empty. Placeholder kept for numbering. See People mode for a similar, fully-worked example.

Headcount tracking

Source: scheduling/example_34_headcount_tracking.py

What it does

Compares three ways of tracking resource (headcount) usage over time when task durations are state-dependent (can be 2 or 3 depending on whether the task overlaps a break).

  • Method 0 - native cumulative. One add_cumulative with the actual task intervals.
  • Method 1 - cumulative with start time. Per-time-slot booleans var_task_starts_presence[t, i] indicate task start. Per-duration "did a task starting within the last d slots cover this slot?" booleans are combined with add_max_equality. Per-task, per-time resource variables are then switched by var_task_overlap_break[t].
  • Method 2 - cumulative with overlap. Encodes the same "is this slot inside the task?" using per-slot overlap booleans (start <= t AND end > t).

The file also defines a Variables dataclass and an extract_solution(...) helper that converts variable dicts into their solved values, handling IntervalVar specially.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Set
import copy
import logging
logger = logging.getLogger(__name__)


@dataclass
class Variables:
    """
    All variables of the optimization problem, used for decisions and result values
    Type `Any` shorthand for a solver variable object, or a result float/int
    """
    var_task_starts: Dict[str, any] = None
    var_task_ends: Dict[str, any] = None
    var_task_durations: Dict[str, any] = None
    var_task_overlap_break: Dict[str, any] = None
    var_intervals: Dict[str, any] = None
    total_duration: Dict[str, any] = None
    make_span: Dict[int, any] = None


def extract_solution(solver: cp_model.CpSolver, variables):
    """Extracts the solution from the variables
    Three kinds of variables have been considered
    Integer, Boolean and Interval Variable
    Incase of other types of variables,
    this function would require modification
    """

    # Initializing solution class
    solution = copy.deepcopy(variables)
    for varname, vardict in variables.__dict__.items():
        setattr(solution, varname, None)

    # Assigning variable values
    for varname, vardict in variables.__dict__.items():
        if vardict is not None:
            setattr(
                solution,
                varname,
                {
                    k: solver.value(v) if type(v) not in [cp_model.IntervalVar] else v
                    for k, v in vardict.items()
                },
            )
        else:
            logger.warning(f"Variable '{varname}' is defined but not used in model")
    return solution



if __name__ == '__main__':
    """
    Offset = 0:
        | x |   |   | x |   |   | x |   |   |
    Offset = 1:
        |   | x |   |   | x |   |   | x |   |    
    Offset = 2:
        |   |   | x |   |   | x |   |   | x |    

    where x represent a unit duration break period

    """
    variables = Variables()
    break_offset = 0
    num_of_tasks = 1
    max_time = num_of_tasks * 3
    processing_time = 2

    if break_offset == 0:
        help_text = "| x |   |   | x |   |   | x |   |   |"
    elif break_offset == 1:
        help_text = "|   | x |   |   | x |   |   | x |   |"
    elif break_offset == 2:
        help_text = "|   |   | x |   |   | x |   |   | x |"
    else:
        print("offset wrong")
        exit()

    breaks = [(i * num_of_tasks + break_offset, i * num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
    tasks = {x for x in range(num_of_tasks)}
    starts_no_break = [x * 3 + break_offset + 1 for x in range(-1, num_of_tasks) if x * 3 + break_offset + 1 >= 0]
    starts_break = list(set(range(max_time)).difference(starts_no_break))
    domain_no_break = cp_model.Domain.from_values([x for x in starts_no_break])
    domain_break = cp_model.Domain.from_values([x for x in starts_break])

    model = cp_model.CpModel()

    variables.var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    variables.var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    variables.var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
    variables.var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}

    for task in tasks:
        if task == 0:
            continue
        model.add(variables.var_task_ends[task - 1] <= variables.var_task_starts[task])

    for task in tasks:
        model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_break).only_enforce_if(
            variables.var_task_overlap_break[task]
        )

        model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_no_break).only_enforce_if(
            variables.~var_task_overlap_break[task]
        )
        # model.add(variables.var_task_durations[task] == processing_time + variables.var_task_overlap_break[task] * 1)

        model.add(variables.var_task_durations[task] == processing_time + 1).only_enforce_if(
            variables.var_task_overlap_break[task]
        )

        model.add(variables.var_task_durations[task] == processing_time).only_enforce_if(
            variables.~var_task_overlap_break[task]
        )

    # intervals
    variables.var_intervals = {
        task: model.new_interval_var(
            variables.var_task_starts[task],
            variables.var_task_durations[task],
            variables.var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }


    tracking_method = 1

    if tracking_method == 0:
        # native  cumulative

        variables.resource_needs = {'all': model.new_int_var(0, 10, 'resource_needs')}
        resource_needs = {task: 1 for task in tasks}

        intervals, headcounts = [], []
        # 2.1 Add tasks requirements
        # 2.1.1 Add demand-based intervals
        for task in tasks:
            intervals.append(variables.var_intervals[task])
            headcounts.append(resource_needs[task])
        # 2.2 Create cumulative constraints
        model.add_cumulative(intervals, headcounts, variables.resource_needs['all'])
    elif tracking_method == 1:
        # cumulative_with_start_time

        max_r = 10
        lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
        ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]

        times_min = min(lb)
        times_max = max(ub)
        time_range = range(times_min, times_max + 1)
        variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
                                    for t in time_range
                                    }
        variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                         for task in tasks
                                         for t in time_range
                                         }
        variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                                  for task in tasks
                                                  for t in time_range
                                                  for duration in [2, 3]
                                                  }
        variables.var_task_starts_presence = {
            (task, t): model.new_bool_var(f'start_{task}_{t}')
            for task in tasks
            for t in range(times_min, times_max + 1)
        }
        for task in tasks:
            model.add_exactly_one([variables.var_task_starts_presence[task, t] for t in time_range])
            # Define min & max duration
            min_duration = variables.var_task_durations[task].proto().domain[0]
            max_duration = variables.var_task_durations[task].proto().domain[1]
            for t in time_range:
                # Capture start time
                model.add(variables.var_task_starts[task] == t).only_enforce_if(
                    variables.var_task_starts_presence[task, t])
                # Capture based on duration
                for duration in [min_duration, max_duration]:
                    model.add_max_equality(variables.duration_task_resource_needs[task, t, duration],
                                         [variables.var_task_starts_presence[task, t_start]
                                          for t_start in range(t - duration + 1, t + 1)
                                          if (task, t_start) in variables.var_task_starts_presence
                                          ]
                                         )
                # Capture the actual task duration and the needs
                model.add(variables.task_resource_needs[task, t]
                          == variables.duration_task_resource_needs[task, t, 2]
                          ).only_enforce_if(variables.~var_task_overlap_break[task])
                model.add(variables.task_resource_needs[task, t]
                          == variables.duration_task_resource_needs[task, t, 3]
                          ).only_enforce_if(variables.var_task_overlap_break[task])

        # Capture total resource needs
        for t in range(times_min, times_max + 1):
            model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
                                                          for task in tasks
                                                          ]
                                                         ))
    elif tracking_method == 2:
        """
        Cumulative with overlap
        """

        max_r = 10
        lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
        ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]

        times_min = min(lb)
        times_max = max(ub)
        resource_needs = {task: 1 for task in tasks}
        variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
                                    for t in range(times_min, times_max + 1)
                                    }
        variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                         for task in tasks
                                         for t in range(times_min, times_max + 1)
                                         }
        variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                                  for task in tasks
                                                  for t in range(times_min, times_max + 1)
                                                  for duration in [2, 3]
                                                  }
        variables.var_task_starts_presence = {
            (task, t): model.new_bool_var(f'start_{task}_{t}')
            for task in tasks
            for t in range(times_min, times_max + 1)
        }
        for task in tasks:
            for t in range(times_min, times_max + 1):
                # s[i] < t
                b1 = model.new_bool_var("")
                model.add(variables.var_task_starts[task] <= t).only_enforce_if(b1)
                model.add(variables.var_task_starts[task] > t).only_enforce_if(~b1)

                # t < e[i]
                b2 = model.new_bool_var("")
                model.add(t < variables.var_task_ends[task]).only_enforce_if(b2)
                model.add(t >= variables.var_task_ends[task]).only_enforce_if(~b2)

                # b1 and b2 (b1 * b2)
                b3 = model.new_bool_var("")
                model.add_bool_and([b1, b2]).only_enforce_if(b3)
                model.add_bool_or([~b1, ~b2]).only_enforce_if(~b3)

                # b1 * b2 * r[i]
                model.add_multiplication_equality(variables.task_resource_needs[task, t], [b3, resource_needs[task]])

        # Capture total resource needs
        for t in range(times_min, times_max + 1):
            model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
                                                          for task in tasks
                                                          ]
                                                         ))


    # Objectives
    variables.make_span = {0: model.new_int_var(0, max_time, "make_span")}
    model.add_max_equality(
        variables.make_span[0],
        [variables.var_task_ends[task] for task in tasks]
    )
    variables.total_duration = {0: model.new_int_var(0, max_time, "make_span")}
    model.add(variables.total_duration[0] == sum(variables.var_task_durations.values()))

    model.minimize(variables.make_span[0] + variables.total_duration[0])

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start
    print("total time:", total_time)

    print_result = True
    solution: Variables = extract_solution(solver, variables)
    # show the result if getting the optimal one
    if print_result:
        print("-----------------------------------------")
        print(help_text)
        print("breaks periods:", breaks)
        print("break starts:", starts_break)
        print("no break starts:", starts_no_break)

        if status == cp_model.OPTIMAL:
            big_list = []
            for task in tasks:
                tmp = [
                    f"task {task}",
                    solution.var_task_starts[task],
                    solution.var_task_ends[task],
                    solution.var_task_overlap_break[task],
                    solution.var_task_durations[task],
                ]
                big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
            df = df.sort_values(['start'])
            print(df)
            if tracking_method == 0:
                print('Using method: cumulative_with_built_in_feature')
                print('resource_needs', solution.resource_needs)

            elif tracking_method == 1:
                print('Using method: cumulative_with_start_time')
                print('resource_needs', solution.resource_needs)
                print('task_resource_needs', solution.task_resource_needs)
                print('duration_task_resource_needs', solution.duration_task_resource_needs)
                print('var_task_starts_presence', solution.var_task_starts_presence)

            elif tracking_method == 2:
                print('Using method: cumulative_with_overlap')
                print('resource_needs', solution.resource_needs)
                print('task_resource_needs', solution.task_resource_needs)
                print('duration_task_resource_needs', solution.duration_task_resource_needs)
                print('var_task_starts_presence', solution.var_task_starts_presence)

            print('Make-span:', solution.make_span[0])
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

Max number of continuous tasks

Source: scheduling/example_09_max_number_of_continuous_tasks.py

What it does

Introduces campaigning by modeling campaigns explicitly as entities:

  • For every product, pre-compute as many candidate campaigns as there are tasks of that product.
  • Campaign variables: start, end, duration, presence.
  • var_task_campaign_presences[t, c]: task t is assigned to campaign c. Exactly one per task.
  • Campaign duration is the sum of task durations assigned to it. Campaign start/end bound task starts/ends of its members.
  • A campaign is present iff at least one task is assigned (add_max_equality).
  • var_campaign_durations[c] <= max_conti_task_num caps campaign size.
  • Campaigns are sequenced with a campaign-level add_circuit, with a changeover distance enforced between consecutive campaigns.

The expected pattern for four tasks A A A B is A A -> CO -> A -> CO -> B.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
products = {'A', 'B'}
task_to_product = {1: 'A', 2: 'A', 3: 'A', 4: 'B'}
processing_time = {'A': 1, 'B': 1}
changeover_time = 2
max_conti_task_num = 2
max_time = 100

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] else changeover_time
    for t1 in tasks for t2 in tasks
    if t1 != t2
}



# Campaign related data pre=processing

max_product_campaigns = {
    product: len([task for task in tasks if task_to_product[task]==product]) for product in products
}

product_campaigns = {
    (product, f"{product}_{campaign}")
    for product in max_product_campaigns
    for campaign in list(range(0, max_product_campaigns[product]))
    #if max_product_campaigns[product] > 0
}

campaign_to_product = {
    campaign: product for product, campaign in product_campaigns
}

campaigns = {campaign for product, campaign in product_campaigns}

product_to_campaigns = {
    product: [c for c in campaigns if campaign_to_product[c] == product] for product in products
}

task_to_campaigns = {
    task: [
        campaign for campaign in campaigns if campaign_to_product[campaign] == task_to_product[task]
    ]
    for task in tasks
}

campaign_size = {campaign: max_conti_task_num for campaign in campaigns}

campaign_duration = {campaign: max_conti_task_num for campaign in campaigns}

campaign_to_tasks = {
    campaign:
        [
            task for task in tasks if campaign in task_to_campaigns[task]
        ]
    for campaign in campaigns
}

m_cost_campaign = {
    (c1, c2): 0
    if campaign_to_product[c1] == campaign_to_product[c2] else changeover_time
    for c1 in campaigns for c2 in campaigns
    if c1 != c2
}




# Need changeover if we switch from a product to a different product
# We can continue to do tasks of the same product type but there is
# a max number of continuous production of tasks with the same product type
# For A A A, we expect A -> A -> Changeover -> A
# For A A A A A, we expect A -> A -> Changeover -> A A -> Changeover -> A
# For A B, we expect A -> Changeover -> B
# For A A B, we expect A -> A -> Changeover -> B


# 2. Decision variables

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

# variables_task_sequence = {
#     (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }
#
# variables_co_starts = {
#     (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }
#
# variables_co_ends = {
#     (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# Task Duration
# for task in tasks:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

var_task_intervals = {
    t: model.new_interval_var(
        variables_task_starts[t],
        1,
        variables_task_ends[t],
        f"task_{t}_interval"
    )
    for t in tasks
}



# Sequence
# arcs = list()
# for t1 in tasks:
#     for t2 in tasks:
#         # arcs
#         if t1 != t2:
#             arcs.append([
#                 t1,
#                 t2,
#                 variables_task_sequence[(t1, t2)]
#             ])
#             distance = m_cost[t1, t2]
#             # cannot require the time index of task 0 to represent the first and the last position
#             if t2 != 0:
#                 # to schedule tasks and c/o
#                 model.add(
#                     variables_task_ends[t1] <= variables_co_starts[t1, t2]
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
#                 model.add(
#                     variables_co_ends[t1, t2] <= variables_task_starts[t2]
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
#                 model.add(
#                     variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add_circuit(arcs)


# Campaigning related

var_campaign_starts = {
    c: model.new_int_var(0, max_time, f"start_{c}") for c in campaigns
}

var_campaign_ends = {
    c: model.new_int_var(0, max_time, f"c_end_{c}") for c in campaigns
}

var_campaign_durations = {
    c: model.new_int_var(0, max_time, f"c_duration_{c}") for c in campaigns
}

var_campaign_presences = {
    c: model.new_bool_var(f"c_presence_{c}") for c in campaigns
}

# Task Duration
# for c in campaigns:
#     model.add(
#         var_campaign_ends[c] - var_campaign_starts[c] == var_campaign_durations[c]
#     )


# var_campaign_intervals = {
#     c: model.new_optional_interval_var(
#         var_campaign_starts[c],  # campaign start
#         var_campaign_durations[c],  # campaign duration
#         var_campaign_ends[c],  # campaign end
#         var_campaign_presences[c],  # campaign presence
#         f"c_interval_{c}",
#     )
#     for c in campaigns
# }

# If a task in allocated to a campaign
var_task_campaign_presences = {
    (t, c): model.new_bool_var(f"task_{t}_presence_in_campaign_{c}") for t in tasks for c in task_to_campaigns[t]
}

# add_campaign_definition_constraints
# 1. Campaign definition: Start & duration based on tasks that belongs to the campaign

for c in campaigns:
    #  1. Duration definition
    model.add(
        var_campaign_durations[c] == sum(
            processing_time[task_to_product[t]] * var_task_campaign_presences[t, c]
            for t in campaign_to_tasks[c]
        )
    )
    # 2. Start-end definition
    # TODO: MinEquality ?
    for t in campaign_to_tasks[c]:
        # var_campaign_starts
        # var_campaign_ends

        model.add(var_campaign_starts[c] <= variables_task_starts[t]).only_enforce_if(
            var_task_campaign_presences[t, c]
        )
        model.add(variables_task_ends[t] <= var_campaign_ends[c]).only_enforce_if(
            var_task_campaign_presences[t, c]
        )
    # 3. Link c & tc presence: If 1 task is scheduled on a campaign -> presence[t, c] = 1 ==> presence[c] == 1
    # as long as there is one task in a campaign, this campaign must be present
    model.add_max_equality(
        var_campaign_presences[c], [var_task_campaign_presences[t, c] for t in campaign_to_tasks[c]]
    )

# 2. Definition of the bool var: if a task belongs to a campaign
for task in tasks:
    # One task belongs to at most 1 campaign
    # task_to_campaigns[task]
    model.add(
        sum(var_task_campaign_presences[task, campaign]
            for campaign in campaigns
            if campaign in task_to_campaigns[task]
            ) == 1
    )
# 3. No campaign overlap
# Campaigns won't overlap anyway. But we need this for tasks within campaigns
model.add_no_overlap([x for x in var_task_intervals.values()])

for c in campaigns:
    model.add(
        var_campaign_durations[c] <= 2
    )


# Add campaign circuit

arcs = []

var_campaign_sequence = {}

for node_1, campaign_1 in enumerate(campaigns):

    tmp1 = model.new_bool_var(f'first_to_{campaign_1}')
    arcs.append([
        0, node_1 + 1, tmp1
    ])

    tmp2 = model.new_bool_var(f'{campaign_1}_to last')
    arcs.append([
        node_1 + 1, 0, tmp2
    ])

    arcs.append([node_1 + 1, node_1 + 1, ~var_campaign_presences[campaign_1]])

    # for outputting
    var_campaign_sequence.update({(0, campaign_1): tmp1})
    var_campaign_sequence.update({(campaign_1, 0): tmp2})

    for node_2, campaign_2 in enumerate(campaigns):

        if node_1 == node_2:
            continue

        indicator_node_1_to_node_2 = model.new_bool_var(f'{campaign_1}_to_{campaign_2}')

        var_campaign_sequence.update({(campaign_1, campaign_2): indicator_node_1_to_node_2})

        distance = m_cost_campaign[campaign_1, campaign_2]

        arcs.append([node_1 + 1, node_2 + 1, indicator_node_1_to_node_2])

        model.add(
            var_campaign_ends[campaign_1] + distance <= var_campaign_starts[campaign_2]
        ).only_enforce_if(
            indicator_node_1_to_node_2
        ).only_enforce_if(
            var_campaign_presences[campaign_1]
        ).only_enforce_if(
            var_campaign_presences[campaign_2]
        )

model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )
    print('-------------------------------------------------')
    print('Make-span:', solver.value(make_span))
    # for t1 in tasks:
    #     for t2 in tasks:
    #         if t1 != t2:
    #             value = solver.value(var[t1, t2])
    #             if value == 1 and t2 != 0:
    #                 print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[t1, t2]}')
    #                 print('variables_task_sequence[t1, t2]',
    #                       solver.value(variables_task_sequence[t1, t2]))
    #                 print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
    #                 print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
    #
    #             if value == 1 and t2 == 0:
    #                 print(f'{t1} --> {t2}   Closing')
    for c1 in list(campaigns) + [0]:
        for c2 in list(campaigns) + [0]:
            if c1 == c2:
                continue
            value = solver.value(var_campaign_sequence[c1, c2])

            if value == 1 and c2 != 0 and c1 != 0:
                c1_tasks = []
                c2_tasks = []
                for task in campaign_to_tasks[c1]:
                    if solver.value(var_task_campaign_presences[task, c1]) == 1:
                        c1_tasks.append(task)

                for task in campaign_to_tasks[c2]:
                    if solver.value(var_task_campaign_presences[task, c2]) == 1:
                        c2_tasks.append(task)

                print(f'{c1} --> {c2}   {campaign_to_product[c1]} {c1_tasks}  >>  {campaign_to_product[c2]} {c2_tasks} cost: {m_cost_campaign[c1, c2]}')

            if value == 1 and c1 == 0:
                print(f'{c1} --> {c2}   Starting')

            if value == 1 and c2 == 0:
                print(f'{c1} --> {c2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Campaigning time constraint (empty)

Source: scheduling/example_18_campaigning_time_constraint.py

The file is empty. Placeholder kept for numbering. The intended topic was likely "campaigns with a maximum total duration"; see Campaigning with cumul for a related, working model.

Cleaning holding time (empty)

Source: scheduling/example_19_cleaning_holding_time.py

The file is empty. Placeholder kept for numbering. The intended topic was likely "cleaning events with a minimum or maximum holding time between tasks"; see Changeover as event for the closest working example.

Campaigning with cumul

Source: scheduling/example_24_campaigning_with_cumul.py

What it does

Introduces the cumulative rank approach to campaigning. One product, many tasks, a configurable campaign_size.

  • var_task_cumul[t] is a rank in [0, campaign_size - 1].

  • var_task_reach_max[t] fires when the rank hits the cap.

  • On each t1 -> t2 arc:

    • Continue campaign (reach_max[t1] false): end[t1] <= start[t2] and cumul[t2] == cumul[t1] + 1.
    • End campaign (reach_max[t1] true): insert changeover, reset cumul[t2] == 0.

The circuit uses a node -1 as the first/last dummy.

The __main__ block benchmarks solve times over campaign sizes 2 and 3, plotting scalability.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result = True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # if from task has not reached MAX, continue the campaign
            model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])
            model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])

            # if from task has reached MAX, apply changeover and reset its cumulative indicator
            model.add(var_task_cumul[t2] == 0).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])
            model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    return total_time


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
    plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 8
    sizes = range(2, N+1)
    model_times_campaign_2 = []
    model_times_campaign_3 = []

    for num_task in sizes:
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))

    print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence

Source: scheduling/example_25_campaigning_with_locked_seq.py

What it does

Same cumulative-rank campaigning as 24, but adds a heuristic that locks the task order:

for task in tasks:
    if task != 0:
        model.add(var_task_ends[task - 1] <= var_task_starts[task])

When tasks happen to be indexed in the desired priority/deadline order, this can provide a large speed-up without changing the optimum.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result = True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    for task in tasks:
        if task != 0:
            model.add(var_task_ends[task-1] <= var_task_starts[task])


    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # if from task has not reached MAX, continue the campaign
            model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])
            model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])

            # if from task has reached MAX, apply changeover and reset its cumulative indicator
            model.add(var_task_cumul[t2] == 0).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])
            model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    return total_time


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
    plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 25
    sizes = range(2, N+1)
    model_times_campaign_2 = []
    model_times_campaign_3 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))

    print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence improved

Source: scheduling/example_26_campaigning_locked_seq_improved.py

What it does

Refines 25 by removing the "force reach_max when cumul hits the cap" implication. The solver is now free to end a campaign before reaching the size limit if that gives a better objective.

The order lock becomes start[t-1] <= start[t] (no longer end <= start), which is looser and compatible with flexible campaign ends.

Also introduces the add_max_equality(max_values, [0, cumul[t1] + 1 - reach_max[t1] * campaign_size]) trick for computing the next rank variable under an only_enforce_if.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result=True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    # A relative later task shall not start earlier than a relative earlier task
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] <= var_task_starts[task])

    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # # if from task has not reached MAX, continue the campaign
            # model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(~var_task_reach_max[t1])
            # model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(~var_task_reach_max[t1])
            #
            # # if from task has reached MAX, apply changeover and reset its cumulative indicator
            # model.add(var_task_cumul[t2] == 0).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(var_task_reach_max[t1])
            # model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(var_task_reach_max[t1])

            model.add(
                var_task_ends[t1] + var_task_reach_max[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            model.add(
                var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            ).only_enforce_if(
                literals[t1, t2]
            )

            # The following makes the model invalid
            # model.add_max_equality(
            #     var_task_cumul[t2],
            #     [
            #         0,
            #         var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            #     ]
            # ).only_enforce_if(
            #     literals[t1, t2]
            # )

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1,  label='Campaign size: 2')
    plt.plot(x, y2,  label='Campaign size: 5')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 60
    sizes = range(2, N+1, 4)
    model_times_campaign_2 = []
    model_times_campaign_5 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))

    df = pd.DataFrame({
        'num_tasks': sizes,
        'c2': model_times_campaign_2,
        'c5': model_times_campaign_5
    })
    df.to_csv("example_26_result.csv")

    print_unit_test_result(sizes,
                           model_times_campaign_2,
                           model_times_campaign_5,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence improved (flexible)

Source: scheduling/example_26_campaigning_locked_seq_improved_flexible.py

What it does

Twin of 26a with the same flexible campaign-end semantics, plus a comment line that warns against forcibly setting reach_max when the cap is reached - the intended behavior is to let the solver decide.

The two 26 files exist side by side to document the tweak explicitly.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result=True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    # A relative later task shall not start earlier than a relative earlier task
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] <= var_task_starts[task])

    # ! SHALL REMOVE THE FOLLOWING ! LEAVE THE DECISION (OF STARTING A NEW CAMPAIGN OR NOT) BACK TO THE MODEL !
    # for task in tasks:
    #     model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
    #     model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # [ task1 ] -> [ C/O ] -> [ task 2]
            model.add(
                var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            # This is for fixed campaigning size. DO full-campaign or NOT DO.
            # model.add(
            #    var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            # ).only_enforce_if(literals[t1, t2])

            # The creator has confirmed add_max_equality is not compatible with only_enforce_if
            # model.add_max_equality(
            #     var_task_cumul[t2],
            #     [0,var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            # ).only_enforce_if(literals[t1, t2])

            # ! HERE IS THE CHANGE RECOMMENDED FOR FLEXIBLE CAMPAIGNING. BUT IN TWO STEPS !
            # NOTE var_reach_campaign_end ARE NOW OPEN BOOL DECISION VARIABLES.
            #
            # If reaching limit (var_task_cumul +1 is equal to campaign_size), the expression is max (0,0)
            #   the model must do C/O because var_task_cumul[t2] has to be 0 (a new campaign starts).
            #
            # If not yet reaching limit (var_task_cumul +1 < campaign_size), the expression is max(0, -x)
            #   model can still choose to do C/O by having var_reach_campaign_end to be 1 (if that brings a better obj)
            model.add_max_equality(
                max_values[t1, t2],
                [0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            )
            model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      solver.value(var_reach_campaign_end[task])
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1,  label='Campaign size: 2')
    plt.plot(x, y2,  label='Campaign size: 5')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    # print('Point check for run_model(40, 10)\nExpect make-span = 40*1 + (40/10 - 1)*2 = 40 + 6 = 46')
    # run_model(40, 10)

    N = 60
    sizes = range(2, N+1, 4)
    model_times_campaign_2 = []
    model_times_campaign_5 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))

    df = pd.DataFrame({
        'num_tasks': sizes,
        'c2': model_times_campaign_2,
        'c5': model_times_campaign_5
    })
    print(df)

    print_unit_test_result(sizes,
                           model_times_campaign_2,
                           model_times_campaign_5,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning across products

Source: scheduling/example_27_campaigning_products.py

What it does

Extends the cumulative-rank campaigning to multiple products.

  • product_change_indicator[t1, t2] is 1 iff t1 and t2 belong to different products.
  • var_product_change[t1] captures whether the arc out of t1 crosses a product boundary.
  • var_reach_campaign_end[t1] >= var_product_change[t1]: a product change forces the campaign to end (and therefore a changeover).
  • The add_max_equality trick from 26 is used to reset or increment the rank under only_enforce_if(literals[t1, t2]).
  • An optional heuristic locks cumul[t-1] <= cumul[t] per product group to speed things up.

The __main__ block prints the expected vs. solver make-span for a parameter set, making it a small sanity check.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
import string
from math import ceil

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate tasks of products (no more than 26 products) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, print_result=True):
    """
    Do changeovers if either of the following occurs:
    1. Changeover between different products: [A] -> changeover -> [B]
    2. Previous campaign reaching  size limit: [A ... A]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 3
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*2
    processing_time = 1

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print(f'\nInput data: {task_to_product}\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    for product_idx, product in enumerate(range(number_of_products)):
        model.add(var_task_cumul[product_idx*num_of_tasks_per_product] == 0)

    var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
    var_product_change = {task: model.new_bool_var(f"task_{task}_go_to_different_product") for task in tasks}

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)

    print("Apply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLocking {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_cumul[_index-1] <= var_task_cumul[_index])

    # Option 2: Locking the sequence of all tasks! This is quicker (0.10s for 3, 4, 4)
    # print("Locking 0", end=" ")
    # for task in tasks:
    #     if task != 0:
    #         print(f"<= {task}", end=" ")
    #         model.add(var_task_starts[task-1] <= var_task_starts[task])

    print("\n")

    var_task_intervals = {
        t: model.new_interval_var(var_task_starts[t], processing_time, var_task_ends[t], f"task_{t}_interval")
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # [ task1 ] -> [ C/O ] -> [ task 2]
            model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(
                literals[t1, t2]
            )

            model.add(var_reach_campaign_end[t1] >= var_product_change[t1])

            model.add(
                var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            # allow flexible campaigning
            model.add_max_equality(
                max_values[t1, t2],
                [0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            )
            model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                if task%num_of_tasks_per_product == 0:
                    print('--------- product divider ---------\n')
                print(f'Task {task} {task_to_product[task]}',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      solver.value(var_reach_campaign_end[task]),
                      solver.value(var_product_change[task]),
                      )
                if solver.value(var_reach_campaign_end[task]):
                    print('-- campaign ends --\n')
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    _number_of_products = 3
    _num_of_tasks_per_product = 4
    _campaign_size = 4
    n = ceil(_num_of_tasks_per_product/_campaign_size)
    _make_span = (_num_of_tasks_per_product + 2*n)*_number_of_products-2
    run_time = run_model(_number_of_products, _num_of_tasks_per_product, _campaign_size)
    print(f"Runtime: {round(run_time, 2)}s")

    print(f'\nExpected make-span if all right:'
          f'\n = (_num_of_tasks_per_product + changeover_time*ceil(_num_of_tasks_per_product/_campaign_size))*'
          f'_number_of_products - changeover_time '
          f'\n = {_make_span}')

Campaigning products x machines

Source: scheduling/example_28_campaigning_products_machines.py

What it does

Combines 27 (multi-product campaigning) with the multi-machine structure from 03.

  • Rank, reach_campaign_end, and product_change booleans become indexed by (machine, task).
  • Per-machine add_circuit, with machine-task presence self-loops.
  • Per-machine campaigning rules (continue vs. end campaign) inside each circuit's arc constraints.
  • Objective is still make_span.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
    """
    Allocate to tasks to multiple machines
    And do changeover if either of the following occurs:
    1. Switch between different products: [A campaign] -> changeover -> [B campaign]
    2. Previous campaign reaching the size limit: [A FULL campaign]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 2
    # number_of_machines = 2
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*3
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product, '\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    model.add_decision_strategy(
        var_m_t_product_change.values(),
        cp_model.CHOOSE_FIRST,
        cp_model.SELECT_MIN_VALUE
    )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    print("\nApply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLock {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue

                # if t1 > t2:
                #     model.add(literals[m, t1, t2] == 0)

                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # show the result if getting the optimal one
    if print_result:
        if status == cp_model.OPTIMAL:
            big_list = []
            for m in machines:
                for task in tasks:
                    if solver.value(var_machine_task_presences[m, task]):
                        tmp = [
                            f"machine {m}",
                            f"task {task}",
                            task_to_product[task],
                            solver.value(var_task_starts[task]),
                            solver.value(var_task_ends[task]),
                            solver.value(var_machine_task_rank[m, task]),
                            solver.value(var_m_t_product_change[m, task]),
                            solver.value(var_m_t_reach_campaign_end[m, task])
                        ]
                        big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
            df = df.sort_values(['machine', 'start'])
            for m in machines:
                print(f"\n======= Machine {m} =======")
                df_tmp = df[df['machine']==f"machine {m}"]
                print(df_tmp)
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    # number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
    args = 4, 4, 3, 3

    runtime = run_model(*args)
    print(f"Solving time: {round(runtime, 2)}s")

Campaigning with pregrouping (empty)

Source: scheduling/example_30_campaigning_with_pregrouping.py

The file is empty. Placeholder kept for numbering. The intended topic was likely "pre-group tasks into candidate campaigns before handing them to the solver"; see Max number of continuous tasks for the closest working model.

Campaigning faster

Source: scheduling/example_31_campaigning_faster.py

What it does

A tuned variant of 28 (multi-product, multi-machine campaigning) aiming for faster solves. The structure is the same; differences are in which heuristics are enabled and how the constraints are ordered.

It is the natural next step after 28 when experimenting with scalability.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
    """
    Allocate to tasks to multiple machines
    And do changeover if either of the following occurs:
    1. Switch between different products: [A campaign] -> changeover -> [B campaign]
    2. Previous campaign reaching the size limit: [A FULL campaign]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 2
    # number_of_machines = 2
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*3
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product)

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    model.add_decision_strategy(
        var_m_t_product_change.values(),
        cp_model.CHOOSE_FIRST,
        cp_model.SELECT_MIN_VALUE
    )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    print("Apply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLock {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue

                # this accelerates code much
                # if t1 > t2 and task_to_product[t1]==task_to_product[t2]:
                #     model.add(literals[m, t1, t2] == 0)
                if t1 > t2:
                    model.add(literals[m, t1, t2] == 0)

                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # show the result if getting the optimal one
    if print_result:
        if status == cp_model.OPTIMAL:
            big_list = []
            for m in machines:
                for task in tasks:
                    if solver.value(var_machine_task_presences[m, task]):
                        tmp = [
                            f"machine {m}",
                            f"task {task}",
                            task_to_product[task],
                            solver.value(var_task_starts[task]),
                            solver.value(var_task_ends[task]),
                            solver.value(var_machine_task_rank[m, task]),
                            solver.value(var_m_t_product_change[m, task]),
                            solver.value(var_m_t_reach_campaign_end[m, task])
                        ]
                        big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
            df = df.sort_values(['machine', 'start'])
            for m in machines:
                print(f"\n======= Machine {m} =======")
                df_tmp = df[df['machine']==f"machine {m}"]
                print(df_tmp)
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    # number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
    args = 4, 4, 3, 3

    runtime = run_model(*args)
    print(f"Solving time: {round(runtime, 2)}s")

Solving by phases

Source: scheduling/example_32_solving_by_phases.py

What it does

Demonstrates warm-starting CP-SAT across multiple solves of the same model.

  • create_model(...) returns a campaigning-with-machines model similar to 28.
  • A list of phases with growing max_time is defined.
  • For each phase, the model is solved. The resulting solution is read back with get_solutions(model, solver) and fed as hints to the next phase using a custom add_hints(model, solution) helper.
  • model.clear_hints() resets between phases.

This is useful when a large horizon is needed for feasibility but a short horizon gives a fast starting point.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import numpy as np
import string


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def create_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):

    model = cp_model.CpModel()

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*5
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product, '\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    # model.add_decision_strategy(
    #     var_m_t_product_change.values(),
    #     cp_model.CHOOSE_FIRST,
    #     cp_model.SELECT_MIN_VALUE
    # )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    # print("\nApply the tasks sequence heuristics")
    # # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    # for product_idx, product in enumerate(range(number_of_products)):
    #     for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
    #         _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
    #         if task_id_in_product_group == 0:
    #             print(f"\nLock {_index}", end=" ")
    #         else:
    #             print(f" <= {_index}", end=" ")
    #             model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    # print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue
                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    return model




# This takes 16 seconds
model = create_model(5, 20, 3, 1)

# solver = cp_model.CpSolver()
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(status)
# print(solver.objective_value())
# #10
# print(total_time, status)

phases = [
    {'phase_id': 0, 'max_time': 0.5},
    {'phase_id': 1, 'max_time': 1},
    {'phase_id': 2, 'max_time': 2},
    {'phase_id': 3, 'max_time': 4},
    {'phase_id': 4, 'max_time': 8}
]

phases = [
    {'phase_id': 0, 'max_time': 10},
    {'phase_id': 1, 'max_time': 10},
    {'phase_id': 2, 'max_time': 10},
    {'phase_id': 3, 'max_time': 10},
    {'phase_id': 4, 'max_time': 10}
]

phases = [
    {'phase_id': 0, 'max_time': 5},
    {'phase_id': 1, 'max_time': 10},
    {'phase_id': 2, 'max_time': 15},
    {'phase_id': 3, 'max_time': 20},
    {'phase_id': 4, 'max_time': 25}
]

n = 6

phases = [
    {'phase_id': i, 'max_time': (i+1)*10} for i in range(n)
]


def get_solutions(model, solver):
    vars_sol = {}
    for i, var in enumerate(model.proto().variables):
        value = solver.response_proto().solution[i]
        vars_sol[var.name] = value
    return vars_sol


def add_hints(model, solution):
    for i, var in enumerate(model.proto().variables):
        if var.name in solution:
            model.proto().solution_hint.vars.append(i)
            model.proto().solution_hint.values.append(solution[var.name])

# get_solutions(model, solver)

obj_list = []

for phase in phases:
    phase_id, max_time = phase['phase_id'], phase['max_time']
    print('----------------------------')
    model.clear_hints()
    if phase_id == 0:
        solver = cp_model.CpSolver()
    if phase_id > 0 and 'solution' in locals():
        print("Add hints")
        add_hints(model, solution)
    print('number of variables in solution hints:', len(model.proto().solution_hint.vars))
    #solver.parameters.keep_all_feasible_solutions_in_presolve = True
    solver.parameters.max_time_in_seconds = max_time
    start = time()
    status = solver.solve(model=model)
    print('number of solutions:', solver.response_proto().solution)

    if status == 1 or status == 3:
        print(f'error status : {status}')
        break
    if status == 0:
        print(f'Cannot find a feasible solution in the given time {max_time}. status:{status}')
        obj_list.append(np.nan)
        continue

    obj_value = solver.objective_value()
    obj_list.append(obj_value)
    solution = get_solutions(model, solver)
    total_time = time() - start
    print(f"phase_id: {phase_id}, max_time: {max_time}, status: {status}, obj: {obj_value}. total time: {round(total_time,1)}")

    if status == 4:
        print('======================================================')
        print('Optimal Solution Achieved ! No need to continue')
        break

print(obj_list)

times = [(i+1)*5 for i in range(n)]


ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(times, obj_list, marker='o')
plt.legend()
plt.title(f'time vs obj')
plt.xlabel('running seconds')
plt.ylabel('obj')
plt.show()

# self.alg_data, self.model = solver.solve(
#     alg_data=self.alg_data,
#     previous_model=self.model,
#     model_parameters=self.model_parameters,
# )


#
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
#     value = solver.response_proto().solution[i]
#     vars_sol[var.name] = value
#
#
#
# solver = cp_model.CpSolver()
# solver.parameters.max_time_in_seconds = 0.5
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# #model.ExportToFile("C:/Temp/xxxx.pb.txt")
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
#     print(var)
#     print(var.name)
#     print(var.domain)
#     print('-------------------')
#     vars_sol[var.name] = var
#
# model.proto().solution_hint
# for i, var in enumerate(model.proto().variables):
#     if var.name in vars_sol:
#         model.proto().solution_hint.vars.append(i)
#         model.proto().solution_hint.values.append(vars_sol[var.name])
# #
#
# print(model.ModelStats())


# @dataclass
# class PhaseParameters:
#     """  Gather different input optimisation parameters that can be set for each phase. """
#
#     obj_phase: str = None
#     max_time_in_seconds: int = None
#     solution_count_limit: int = None
#     restart: bool = False
#
#
# phase_1 = PhaseParameters('phase_1', 10)
# phase_2 = PhaseParameters('phase_2', 10)
#
#


# show the result if getting the optimal one