Introduction

This book collects notes and examples of scheduling models built with OR-Tools CP-SAT. The focus is on job-shop style problems: sequencing tasks on machines, handling changeovers, respecting breaks and shifts, modeling resources, and grouping tasks into campaigns.

The book is split into two parts.

  • Concepts walks through the core modeling ideas once. Read these first if CP-SAT or constraint programming is new to you.
  • Examples indexes the Python files in the scheduling/ folder and links each one back to the concepts it demonstrates. Examples are numbered and build on each other, so reading them in order is usually easiest.

All code lives in the scheduling/ directory of the repo. Open a file in your editor while reading the corresponding chapter to see the full model.

A minimal CP-SAT template

Almost every example follows the same five-step shape.

from ortools.sat.python import cp_model

# 1. Data
# ... sets, durations, changeover table, etc.

# 2. Decision variables
model = cp_model.CpModel()
# ... start/end/interval/bool vars

# 3. Objective
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(make_span, [ends[t] for t in tasks])
model.minimize(make_span)

# 4. Constraints
# ... precedence, resources, circuits, ...

# 5. Solve and post-process
solver = cp_model.CpSolver()
status = solver.solve(model)

The rest of the book explains what goes into step 4.

CP-SAT basics

CP-SAT is a constraint-programming solver with integer and boolean variables. Before looking at scheduling, it helps to be comfortable with a few primitives.

Variables

x = model.new_int_var(0, 100, 'x')      # integer in [0, 100]
b = model.new_bool_var('b')             # boolean (integer in {0, 1})

Linear constraints

model.add(x + y == 10)
model.add(x <= y)
model.add(sum(bs) == 1)               # exactly-one on a list of bools
model.add_exactly_one(bs)               # same, more idiomatic

Boolean combinations

model.add_bool_or([a, b])               # a or b
model.add_bool_and([a, b])              # a and b
model.add_bool_xor([a, b])              # exactly one of a, b

Reification with only_enforce_if

A constraint can be conditioned on a boolean literal. The constraint is only active when the literal is true.

model.add(x >= 5).only_enforce_if(b)
model.add(x < 5).only_enforce_if(~b)

Chaining two only_enforce_if calls gives an "and" of conditions:

model.add(y == 1).only_enforce_if(b1).only_enforce_if(b2)  # y == 1 iff b1 and b2

For "or", you normally introduce intermediate booleans or use add_bool_or.

add_min_equality, add_max_equality, add_multiplication_equality

These express z = min(xs), z = max(xs), z = x * y (or the product of a list). add_max_equality is how makespan is usually encoded:

model.add_max_equality(make_span, [ends[t] for t in tasks])

Domains

For "x belongs to a non-contiguous set of values" use add_linear_expression_in_domain:

domain = cp_model.Domain.from_intervals([[0, 4], [11, 100]])
model.add_linear_expression_in_domain(x, domain)

See example_00_unit_tests.py for a collection of small snippets exercising each of these.

Solve and read back

solver = cp_model.CpSolver()
status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
    print(solver.value(x))

Interval variables

An interval variable bundles three integer variables - start, size, end - with the implicit constraint start + size == end. CP-SAT uses intervals to reason efficiently about scheduling: add_no_overlap and add_cumulative both expect intervals.

Three flavors

Regular interval

iv = model.new_interval_var(start, size, end, name="t1")

Replaces a manual model.add(end - start == size).

Optional interval

An interval that is only scheduled when a presence boolean is true. Essential when a task may or may not be assigned to a given machine.

iv = model.new_optional_interval_var(start, size, end, is_present, name="t1_on_m1")

If is_present is false, the interval disappears from add_no_overlap / add_cumulative reasoning.

Fixed-size interval

Convenient for breaks, shift boundaries, and anything with a known position.

br = model.new_fixed_size_interval_var(start=2, size=1, name="break")

Typical use

intervals = {
    (m, t): model.new_optional_interval_var(
        starts[m, t],
        processing_time[product_of(t)],
        ends[m, t],
        presence[m, t],
        f"t{t}_on_m{m}",
    )
    for t in tasks for m in machines
}

for m in machines:
    model.add_no_overlap([intervals[m, t] for t in tasks])

Examples that introduce intervals: example_05_seq_with_intervals.py (first use), example_03_seq_scale_Mathieu.py (dramatic speed-up vs. manual duration constraints).

Circuit and sequencing

To sequence tasks on a machine, you need both the order and the time constraints that follow from it. CP-SAT's add_circuit is the standard tool.

What add_circuit does

add_circuit(arcs) takes a list of triples [i, j, literal]. It asserts that the selected arcs (where literal == 1) form a single Hamiltonian circuit over the referenced nodes. Self-arcs [i, i, literal] mean node i is skipped when literal is true.

arcs = []
for t1 in tasks:
    arcs.append([0, t1, start_literal(t1)])   # dummy -> t1 (first)
    arcs.append([t1, 0, end_literal(t1)])     # t1 -> dummy (last)
    arcs.append([t1, t1, ~presence[t1]]) # skip t1 if absent
    for t2 in tasks:
        if t1 == t2:
            continue
        arcs.append([t1, t2, seq[t1, t2]])

model.add_circuit(arcs)

Node 0 (or -1) is typically a dummy "first/last" node.

Linking the circuit to time

add_circuit only picks the order. You also need: if t1 -> t2 is chosen, then end[t1] + gap <= start[t2]. This is a reified constraint:

model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])

gap is usually changeover_time (see Changeover) or 0.

Multi-machine

With multiple machines, build one circuit per machine and gate absent tasks with self-loops:

for m in machines:
    arcs = []
    for t in tasks:
        arcs.append([t, t, ~presence[m, t]])
        for t2 in tasks:
            if t != t2:
                arcs.append([t, t2, seq[m, t, t2]])
    model.add_circuit(arcs)

add_exactly_one(presence[m, t] for m in machines) ensures each task ends up on exactly one machine.

Examples: example_01_simple_sequence.py (single machine), example_03_seq_multi_stations.py (multi-machine).

Changeover

A changeover is the time needed to switch a machine from producing one product to another. There are three common ways to model it.

1. In the objective

Charge a cost for each seq[t1, t2] whose products differ. The cost does not appear in the schedule itself, only the cost minimised.

total_co = sum(seq[t1, t2] * changeover_cost[t1, t2] for ...)
model.minimize(make_span + total_co)

Simple, but time and cost are decoupled: the schedule may not leave physical room for the changeover.

Example: example_01_simple_sequence.py.

2. In the precedence constraint

Include the changeover in the gap between tasks:

gap = changeover_time if products_differ(t1, t2) else 0
model.add(end[t1] + gap <= start[t2]).only_enforce_if(seq[t1, t2])

Now time and cost agree: a changeover actually pushes the next task later.

Example: example_04_seq_with_changeover_in_constraint.py.

3. As a first-class event

Create an optional interval for every (t1, t2) that represents the changeover itself. When t1 -> t2 is chosen, the interval is present, sits between the two tasks, and has the right duration.

co_iv = model.new_optional_interval_var(co_start, co_duration, co_end, co_present, ...)
model.add(end[t1] <= co_start).only_enforce_if(seq[t1, t2])
model.add(co_end <= start[t2]).only_enforce_if(seq[t1, t2])
model.add(co_present == 1).only_enforce_if(seq[t1, t2])

This lets you add the changeover interval to add_cumulative (it consumes operator time) or apply cleaning-resource constraints to it.

Example: example_08_changeover_as_event.py.

Starting product

A machine usually begins with some product already loaded. Model it with a dummy task 0 whose "product" is the starting product; the cost from dummy to the first real task is zero if they match, else the usual changeover.

Example: example_02_seq_lock_starting_product.py.

Breaks

A break is a time window during which a machine or operator is unavailable. Three techniques cover most cases.

1. Break as a fixed interval in add_cumulative

For each break, build a new_fixed_size_interval_var and add it alongside task intervals with the full demand. Tasks are pushed around the break.

break_intervals = [
    model.new_fixed_size_interval_var(start=s, size=e - s, name="break")
    for (s, e) in breaks
]
all_intervals = task_intervals + break_intervals
demands = [1] * len(task_intervals) + [1] * len(break_intervals)
model.add_cumulative(all_intervals, demands, capacity=1)

Example: example_07_break_without_changeover.py.

2. Task duration stretched by overlapping breaks

When a task may run through a break and the break simply extends its total time on the machine, use per-time-slot booleans that indicate whether the task uses slot i, then add is_break[i] for each covered slot.

uses[t, i] = starts_before_i AND ends_after_i
duration[t] = base + sum(is_break[i] * uses[t, i] for i)
interval[t] = new_interval_var(start, duration, end, ...)

Example: example_14_task_delaying_break.py.

3. Break-aware start domains

If the break pattern is periodic, restrict task starts to the valid slots with add_linear_expression_in_domain. Much faster than per-slot booleans.

domain_no_break = cp_model.Domain.from_values([...])
model.add_linear_expression_in_domain(start[t], domain_no_break)

Example: example_29_linear_domain_for_breaks.py, example_33_conditional_duration_linear_domain.py.

Automatic jobs

Some "automatic" tasks don't consume the operator while running (think: a machine runs itself after a short manual setup). Model only the setup portion inside the cumulative, using a 1-unit interval at the task's start.

Example: example_12_an_automatic_job.py, example_13_automatic_jobs.py.

Shifts

A shift is a working window. Tasks must fit inside one shift (or, depending on policy, be split / disallowed from crossing shifts).

Synthetic shift breaks

Insert a tiny "fake break" interval at each shift boundary and forbid any task from overlapping it with add_no_overlap. This prevents shift-crossing without enumerating shift assignments.

for (s, e) in synthetic_shift_breaks:
    br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
    for t in tasks:
        model.add_no_overlap([task_interval[t], br])

Example: example_16_shift_crossing_fake_time_unit.py.

Explicit shift assignment

Alternatively, give every task a one-hot presence[shift, task] and enforce the shift window when present:

for t in tasks:
    model.add_exactly_one(presence[s, t] for s in shifts)
    for s in shifts:
        model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
        model.add(end[t]   <= shift_end[s]  ).only_enforce_if(presence[s, t])

More variables, but the assignment is explicit and easy to extend (e.g. to per-shift capacity).

Example: example_17_shift_crossing_mathieu.py.

Resources and cumulative

add_no_overlap says "at most one interval at a time". add_cumulative is the generalisation: each interval consumes some amount of a shared resource, and the total consumption must not exceed a capacity.

No-overlap

model.add_no_overlap(intervals)

Used per machine (one task at a time) and per stage (one job at a time in a flow-shop style).

Cumulative

model.add_cumulative(intervals, demands, capacity)

demands[i] is the amount of resource taken by intervals[i] while it runs. Typical uses:

  • Shared operator across machines. If two machines need the same operator, cumulative over all their task intervals with demand 1 and capacity 1 forbids parallel runs. Example: example_06_seq_with_intervals_resource.py.
  • Breaks. Treat a break as an interval that fully occupies the resource. Example: example_07_break_without_changeover.py.
  • Automatic jobs. Only the setup portion consumes the operator, modeled as a size-1 interval at each task's start.

Resource modes

Some tasks can run in different modes with different durations and headcounts. Encode the choice with a one-hot bool per mode and derive the actual processing time from it:

for t in tasks:
    model.add_exactly_one([mode[t, k] for k in modes])
    model.add(
        proc_time[t] == sum(processing_time[product[t], k] * mode[t, k] for k in modes)
    )

Example: example_10_people_mode.py.

Headcount tracking

If the per-task resource depends on whether the task overlaps a break (or some other condition), plain add_cumulative may be insufficient. Build an explicit per-timestep resource variable and link it to task-start presence booleans. Three methods are compared in example_34_headcount_tracking.py.

Multi-stage jobs

A job can consist of several stages that must run in order. Each stage is a task with its own start/end; the job start is the earliest task start and the job end is the latest task end.

Job - stage - task structure

tasks = {(job, stage) for job in jobs for stage in stages}

for job in jobs:
    model.add_min_equality(job_start[job], [start[job, s] for s in stages])
    model.add_max_equality(job_end[job],   [end  [job, s] for s in stages])

    # stage precedence
    for s in sorted(stages)[:-1]:
        model.add(end[job, s] <= start[job, s + 1])

Example: example_21_stages_one_job.py.

Stage-level no-overlap

If each stage has a single shared machine, forbid two jobs from sitting on the same stage simultaneously:

for s in stages:
    model.add_no_overlap([intervals[job, s] for job in jobs])

Examples: example_22_stages_two_jobs.py, example_23_multistage_two_jobs_co.py.

Campaigning

A campaign is a run of same-product tasks on a machine between two changeovers. Typical rules:

  • tasks within a campaign are the same product and pay no changeover cost,
  • a campaign has a maximum size (e.g. at most N tasks),
  • switching products or hitting the cap triggers a changeover.

Approach 1: campaigns as entities

Create a set of potential campaigns, each with start/end/duration/presence, and variables linking tasks to campaigns. Sequence campaigns (not tasks) using add_circuit. The campaign-level changeover cost sits in the gap between campaigns.

Pros: close to the business view. Cons: more variables, scales worse.

Example: example_09_max_number_of_continuous_tasks.py.

Approach 2: cumulative rank per task

Keep tasks as the atomic unit and attach a rank variable cumul[t] in [0, campaign_size - 1]. On each t1 -> t2 arc:

  • if the campaign continues, cumul[t2] = cumul[t1] + 1,
  • if a changeover happens, cumul[t2] = 0 and end[t1] + changeover <= start[t2].

A reach_max[t] boolean fires when cumul[t] == campaign_size - 1, forcing a reset and changeover. add_max_equality(max_value, [0, cumul[t1] + 1 - reach_end[t1] * campaign_size]) is a useful trick to compute the next rank under an only_enforce_if.

Pros: fewer variables, scales better. Cons: trickier to explain.

Examples: example_24_campaigning_with_cumul.py (base), example_27_campaigning_products.py (multi-product), example_28_campaigning_products_machines.py (multi-machine).

Locking the task order

When tasks have deadlines that align with their index, locking start[t-1] <= start[t] (or the stricter end[t-1] <= start[t]) is a cheap heuristic that often gives a 10x+ solve-time improvement. See example_25_campaigning_with_locked_seq.py and the two example_26_campaigning_locked_seq_improved*.py variants.

Flexible campaign ends

If the model should be free to end a campaign early (not just at the cap), drop the "force reach_end when cumul hits max" implication and let the solver choose. This usually gives better objective values at a small solve- time cost. See the two example_26_*_improved*.py files for the comparison.

Solver techniques

Beyond modeling, CP-SAT exposes a few knobs that help on hard instances.

Decision strategies

Tell the solver which variables to branch on first, and which value to try first. Often needed when a symmetric model returns a "correct but ugly" schedule.

model.add_decision_strategy(
    starts.values(),
    cp_model.CHOOSE_FIRST,
    cp_model.SELECT_MIN_VALUE,
)

Example: example_07_break_without_changeover.py applies two strategies (one on starts, one on sequence literals) to get a canonical output.

Parallel workers

solver.parameters.num_search_workers = 8

Uses N worker threads. Examples example_03_seq_scale*.py benchmark the resulting speedup.

Warm-starting with hints

You can seed the search with values for any subset of variables:

model.proto().solution_hint.vars.append(var_index)
model.proto().solution_hint.values.append(value)

Or clear and re-set them between solves:

model.clear_hints()
add_hints(model, previous_solution)

This is the basis of phase solving: build the full model once, then run it repeatedly with an increasing max_time, feeding each phase's solution in as hints to the next. Example: example_32_solving_by_phases.py.

Reading back values

status = solver.solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
    print(solver.value(var))
    print(solver.objective_value())

MODEL_INVALID almost always means a constraint references a variable that was never bound to the right model instance, or an only_enforce_if was attached to something that is not a literal.

Examples overview

Each example in this section corresponds to one Python file under scheduling/. Chapters are kept short: a brief description, the concepts it demonstrates (linked back to the Concepts section), and the source file inlined at the bottom.

Examples are grouped by topic in the sidebar:

  • Basics - CP-SAT primitives and small modeling tricks.
  • Sequencing - ordering tasks on one or more machines.
  • Changeover and intervals - different ways to model switches between products and the move from manual durations to interval variables.
  • Breaks - unavailable time windows, including breaks that extend a task's duration and automatic jobs that only need an operator for setup.
  • Shifts - preventing tasks from crossing shift boundaries.
  • Multi-stage jobs - jobs with ordered stages and per-stage capacity.
  • Resources - flexible resource/headcount modes and time-varying demand tracking.
  • Campaigning - grouping same-product tasks between changeovers, with multiple modelling approaches.
  • Solver techniques - warm-starting CP-SAT across phases with hints.

A few files (example_11, example_18, example_19, example_30) are empty placeholders kept for numbering; their chapters only note what they would have covered.

Unit tests

Source: scheduling/example_00_unit_tests.py

Before modeling a scheduling problem you have to be fluent in the constraint primitives CP-SAT actually speaks. This file is a bench of tiny self-contained models, each exercising one feature: boolean combinators, reified equalities with only_enforce_if, combining conditions via add_multiplication_equality, and domain constraints for non-contiguous value sets.

It is the only chapter with no scheduling content. Everything later assumes you are comfortable with what lives here. Useful as a cheat sheet when you want to remember how to express, for example, "b = (5 <= x <= 10)". Several snippets are commented-out alternatives kept for comparison.

Concepts

Source

from ortools.sat.python import cp_model
model = cp_model.CpModel()
def get(x):
    return solver.value(x)

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_or(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_and(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add_bool_xor(x, y)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))


#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 2)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 1)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))

#
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x+y == 0)
model.minimize(x+y)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x), get(y))


#
model = cp_model.CpModel()
x = model.new_bool_var('x')
y = model.new_bool_var('y')
model.add(x==1).only_enforce_if(~x)
#model.add(x==0).only_enforce_if(~x)
# model.add(x==1).only_enforce_if(x)
model.add(x==0).only_enforce_if(x)
#model.add(y==1).only_enforce_if(x)
model.minimize(x)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(get(x))


#

model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))









model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
x_is_no_less_than_5 = model.new_bool_var('x_is_no_less_than_5')
x_is_no_more_than_10 = model.new_bool_var('x_is_no_more_than_10')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)

model.add(x_is_no_less_than_5 == x >= 5)


# model.add(x_is_no_less_than_5 == 1).only_enforce_if(x>=5)
# model.add(x_is_no_more_than_10 == 1).only_enforce_if(x <= 10)

model.add(x_is_between_5_and_10 == 1).only_enforce_if(5 <= x).only_enforce_if(x <= 10)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', get(x))
print('x_is_between_5_and_10', get(x_is_between_5_and_10))





##########################################

from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_greater_than_5 = model.new_bool_var('x_is_greater_than_5')
x = model.new_int_var(0, 100, 'x')
model.add(x == 7)
model.add(x >= 5).only_enforce_if(x_is_greater_than_5)
model.add(x < 5).only_enforce_if(~x_is_greater_than_5)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print('x', solver.value(x))
print('x_is_greater_than_5', solver.value(x_is_greater_than_5))



from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
# This gives invalid
model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))




from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
#model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)
model.add(x < 10).only_enforce_if(~x_is_between_5_and_10)
#model.add(x >10).only_enforce_if(~x_is_greater_than_5)
#model.add(x == 3)
model.add(x_is_between_5_and_10 == 1)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))




from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('x_is_between_5_and_10')
model.add(x >= 5).only_enforce_if(x_is_between_5_and_10)
model.add(x <= 10).only_enforce_if(x_is_between_5_and_10)

model.add(x < 5).only_enforce_if(~x_is_between_5_and_10)
model.add(x >10).only_enforce_if(~x_is_between_5_and_10)

model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))





from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')


model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)

model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)

model.add(x_is_between_5_and_10==x_greater_than_5*x_less_than_10)
model.add_multiplication_equality(x_is_between_5_and_10, )

model.add(x == 3)
# model.add(x_is_between_5_and_10 == 0)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))









from ortools.sat.python import cp_model
model = cp_model.CpModel()
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x = model.new_int_var(0, 100, 'x')

model.add_linear_constraint(x, 5, 10).only_enforce_if(x_is_between_5_and_10)
model.add_linear_expression_in_domain(
    x,
    cp_model.Domain.from_intervals([[0, 4], [11, 100]])
).only_enforce_if(~x_is_between_5_and_10)

model.add(x == 3)
solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))





from ortools.sat.python import cp_model
model = cp_model.CpModel()
x = model.new_int_var(0, 100, 'x')
x_is_between_5_and_10 = model.new_bool_var('5<x<10')
x_greater_than_5 = model.new_bool_var('5<x')
x_less_than_10 = model.new_bool_var('x<10')

model.add(x > 5).only_enforce_if(x_greater_than_5)
model.add(x <= 5).only_enforce_if(~x_greater_than_5)

model.add(x < 10).only_enforce_if(x_less_than_10)
model.add(x >= 10).only_enforce_if(~x_less_than_10)

model.add_multiplication_equality(x_is_between_5_and_10, [x_greater_than_5, x_less_than_10])

model.add(x_is_between_5_and_10 == 1)

solver = cp_model.CpSolver()
status = solver.solve(model=model)
print(status)
if status == 1 or status == 4:
    print('x', solver.value(x))
    print('x_is_greater_than_5', solver.value(x_is_between_5_and_10))

Events overlapping

Source: scheduling/example_15_events_overlapping.py

Utility chapter. Given two intervals with fixed times, how do you express "do they overlap, and by how much"?

Two reified booleans - t1 starts before t2 AND t1 ends after t2 starts - combined with add_multiplication_equality give the overlap indicator. The overlap duration is then a conditional end[t1] - start[t2]:

model.add(duration[t1, t2] == end[t1] - start[t2]).only_enforce_if(overlap[t1, t2])
model.add(duration[t1, t2] == 0).only_enforce_if(~overlap[t1, t2])

The pattern recurs whenever you need to measure overlap rather than forbid it. The duration-stretching break model already used a close relative; more will show up in the headcount-tracking chapter.

Concepts

Source

from ortools.sat.python import cp_model

model = cp_model.CpModel()
max_time = 10
tasks = {1, 2}

# 1. Data

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

# overlap
# model.add(var_task_starts[1] == 1)
# model.add(var_task_ends[1] == 5)
# model.add(var_task_starts[2] == 3)
# model.add(var_task_ends[2] == 7)

# No overlap
model.add(var_task_starts[1] == 1)
model.add(var_task_ends[1] == 3)
model.add(var_task_starts[2] == 5)
model.add(var_task_ends[2] == 7)



var_overlap = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_overlap_duration = {
    (t1, t2): model.new_int_var(0, max_time, '')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_start_earlier_than_start = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

var_end_later_than_start = {
    (t1, t2): model.new_bool_var('')
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

for t1 in tasks:
    for t2 in tasks:
        if t1 == t2:
            continue
        model.add(var_task_starts[t1] <= var_task_starts[t2]).only_enforce_if(var_start_earlier_than_start[t1, t2])
        model.add(var_task_starts[t1] > var_task_starts[t2]).only_enforce_if(~var_start_earlier_than_start[t1, t2])

        model.add(var_task_ends[t1] > var_task_starts[t2]).only_enforce_if(var_end_later_than_start[t1, t2])
        model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(~var_end_later_than_start[t1, t2])

        model.add_multiplication_equality(
            var_overlap[t1, t2],
            [var_start_earlier_than_start[t1, t2], var_end_later_than_start[t1, t2]]
        )

        model.add(var_overlap_duration[t1, t2] == var_task_ends[t1] - var_task_starts[t2]).only_enforce_if(var_overlap[t1, t2])
        model.add(var_overlap_duration[t1, t2] == 0).only_enforce_if(~var_overlap[t1, t2])




# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    count = 0
    for t1 in tasks:
        for t2 in tasks:
            if t1 == t2:
                continue
            if solver.value(var_overlap[t1,t2]):
                count = count + 1
                print(f'Task{t1} starts earlier and overlap Task{t2} for a duration of '
                      f'{solver.value(var_overlap_duration[t1, t2])} units')
    if count == 0:
        print("No overlapped tasks at all")

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Simple sequence

Source: scheduling/example_01_simple_sequence.py

Three tasks, two products, one machine. Switching between products costs time. What is the cheapest order?

The smallest useful scheduling model. A dummy task 0 represents "machine idle" so the circuit has a start and an end; sequencing booleans seq[t1, t2] are stitched together with add_circuit; when an arc is chosen, end[t1] <= start[t2] is enforced. Changeover lives in the objective as a cost-weighted sum over the selected arcs, and task duration is the manual end - start == duration.

Everything after this chapter is a variation: more machines, different changeover semantics, extra constraints for breaks or shifts. Understanding where the circuit, the presence booleans, and the order-to-time link go is the hard part.

Concepts

Source

# circuit arcs
# tuples
# add arc : arcs.append([job1_index, job2_index, binary])                             |
# add link: Use only_enforce_if for  binary <--->  job1.start_time <= job2.end_time     | -> three things connected

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

M = 99999

# 1. Data
'''
task   product
1       A
2       B
3       A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}

m = {
    (t1, t2)
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or task_to_product[t1] == 'dummy' or task_to_product[t2] == 'dummy'
    else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}


# 2. Decision variables
'''
(1, 2): 0/1
(1, 3)
(2, 1)
(2, 3)
(3, 1)
(3, 2)
'''

max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_sequence = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for (t1, t2) in m
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)

model.minimize(total_changeover_time)
#model.maximize(total_changeover_time)


# 4. Constraints

# Add duration

for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

arcs = list()
'''
arcs.append([0, 1, model.new_bool_var("dummy0" + "_to_1")])
arcs.append([0, 2, model.new_bool_var("dummy0" + "_to_2")])
arcs.append([0, 3, model.new_bool_var("dummy0" + "_to_3")])
arcs.append([1, 0, model.new_bool_var("1_to_" + "dummy0")])
arcs.append([2, 0, model.new_bool_var("2_to_" + "dummy0")])
arcs.append([3, 0, model.new_bool_var("3_to_" + "dummy0")])
'''
for (from_task, to_task) in m:
    arcs.append(
        [
            from_task,
            to_task,
            variables_sequence[(from_task, to_task)]
        ]
    )

    if from_task != 0 and to_task != 0:
        model.add(
            variables_task_ends[from_task] <= variables_task_starts[to_task]
        ).only_enforce_if(variables_sequence[(from_task, to_task)])

model.add_circuit(arcs)

#model.add(variables_task_starts[0] == 8)

# Solve

# https://github.com/d-krupke/cpsat-primer
#model.add_decision_strategy(variables_task_starts, cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)


solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    for (t1, t2) in m:
        print(f'{t1} --> {t2}:   {solver.value(variables_sequence[(t1, t2)])}')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Sequence with locked starting product

Source: scheduling/example_02_seq_lock_starting_product.py

A machine rarely starts empty. Usually some product is already loaded, and if the first task happens to be that same product no changeover is needed.

The model is identical to the previous chapter with one change: the changeover table depends on the machine's starting product. From the dummy task, going to a task of the starting product is free; going to any other product costs the regular changeover.

m_cost = {
    (t1, t2): 0 if task_to_product[t1] == task_to_product[t2]
                 or (task_to_product[t1] == 'dummy'
                     and task_to_product[t2] == starting_product)
              else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}

This small tweak is the template for every "initial state" extension later in the book.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

M = 99999

# 1. Data
'''
task   product
1       A
2       B
3       A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
starting_product = 'A'

m = {
    (t1, t2)
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

# A -> A, B --> B: 0
# dummy A -> A: 0
# dummy A -> B: 1
# A -> B, B -> A: 1

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == starting_product
    )
    else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}








# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_sequence = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for (t1, t2) in m
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)

model.minimize(total_changeover_time)


# 4. Constraints

# Add duration

for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

arcs = list()

for (from_task, to_task) in m:
    arcs.append(
        [
            from_task,
            to_task,
            variables_sequence[(from_task, to_task)]
        ]
    )

    if from_task != 0 and to_task != 0:
        model.add(
            variables_task_ends[from_task] <= variables_task_starts[to_task]
        ).only_enforce_if(variables_sequence[(from_task, to_task)])

model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    for (t1, t2) in m:
        print(f'{t1} --> {t2}:   {solver.value(variables_sequence[(t1, t2)])}')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Multi-station sequence

Source: scheduling/example_03_seq_multi_stations.py

Scale up to two machines in parallel. The solver now decides both which machine runs each task and in what order.

Variables split in two levels. presence[m, t] is the machine assignment, one-hot per task via add_exactly_one. Task-level start[t], end[t] mirror the chosen machine via only_enforce_if(presence[m, t]). Each machine gets its own add_circuit, with self-loop arcs [t, t, ~presence[m, t]] that let a task skip any machine it is not on. Objective remains makespan plus changeover, with add_max_equality computing the former.

The two-level structure (task-level + machine-task-level) recurs almost unchanged in every subsequent multi-machine chapter.

Concepts

Source

"""
About changeover:
   - changeover cost is independently in objective function
   - The time index of tasks are not reflecting changeover events

function s is to see a sorted dict indexed by tuples

task 0 is only used in arcs

duration constraint is by end - start = duration
In Matthieu approach, no such constraint but there is new_optional_interval_var constraints that did this ?

We need task level start and end for convenience of working with more complicated constraints
"""


from ortools.sat.python import cp_model

# Initiate
M = 99999
model = cp_model.CpModel()


def s(x):
    sorted_keys = sorted(x)
    for key in sorted_keys:
        print(f"{key}, {x[key]}")

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}

# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}
variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}
variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
)

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span + total_changeover_time)


# 4. Constraints

for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)

    # task level link to machine-task level
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        # The changeover consideration is done here by Mattheiu's approach


# This can be replaced by inverval variable ?
for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

for machine in machines:

    arcs = list()

    tmp = [x for x in X if x[0] == machine]

    for (m, from_task, to_task) in tmp:
        arcs.append(
            [
                from_task,
                to_task,
                variables_machine_task_sequence[(m, from_task, to_task)]
            ]
        )

        if from_task != 0 and to_task != 0:
            model.add(
                variables_task_ends[from_task] <= variables_task_starts[to_task]
            ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(machine, task)]
        ])

    model.add_circuit(arcs)





# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print(solver.value(make_span))

    for (m, t1, t2) in sorted(X):
        value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
        if value == 1:
            print(f'Machine {m}: {t1} --> {t2}  ')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Multi-station scale benchmark

Source: scheduling/example_03_seq_scale.py

How does the previous chapter's model scale? Wrap it in a function over num_tasks, crank the size up to 12, measure wall time with eight search workers, plot with matplotlib.

Spoiler: with a manual end - start == duration constraint it gets slow fast. This chapter sets the baseline; the next one shows the fix. The objective is makespan only - the changeover term is commented out to keep the experiment clean.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt


def generate_data(num_tasks):
    tasks = {i+1 for i in range(num_tasks)}
    tasks_0 = tasks.union({0})
    task_to_product = {0: 'dummy'}
    task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
    task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
    return tasks, tasks_0, task_to_product


def model(num_tasks):

    model = cp_model.CpModel()

    # 1. Data
    tasks, tasks_0, task_to_product = generate_data(num_tasks)
    max_time = num_tasks
    processing_time = {'dummy': 0, 'A': 1, 'B': 1}
    changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
    machines = {0, 1}
    machines_starting_products = {0: 'A', 1: 'A'}

    X = {
        (m, t1, t2)
        for t1 in tasks_0
        for t2 in tasks_0
        for m in machines
        if t1 != t2
    }

    m_cost = {
        (m, t1, t2): 0
        if task_to_product[t1] == task_to_product[t2] or (
                task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
        )
        else changeover_time[task_to_product[t2]]
        for (m, t1, t2) in X
    }

    # 2. Decision variables
    variables_task_ends = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_task_starts = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_machine_task_starts = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_ends = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_presences = {
        (m, t): model.new_bool_var(f"presence_{m}_{t}")
        for t in tasks
        for m in machines
    }

    variables_machine_task_sequence = {
        (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
        for (m, t1, t2) in X
    }

    # 3. Objectives

    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

    total_changeover_time = sum(
        [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
    )

    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [variables_task_ends[task] for task in tasks]
    )
    model.minimize(make_span)# + total_changeover_time)

    # 4. Constraints
    for task in tasks:
        task_candidate_machines = machines
        # find the subset in presence matrix related to this task
        tmp = [
            variables_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]

        # this task is only present in one machine
        model.add_exactly_one(tmp)

        # task level link to machine-task level
        for m in task_candidate_machines:
            model.add(
                variables_task_starts[task] == variables_machine_task_starts[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

            model.add(
                variables_task_ends[task] == variables_machine_task_ends[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

    for task in tasks:
        model.add(
            variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
        )

    # add_circuits
    for machine in machines:
        arcs = list()
        tmp = [x for x in X if x[0] == machine]
        for (m, from_task, to_task) in tmp:
            arcs.append(
                [
                    from_task,
                    to_task,
                    variables_machine_task_sequence[(m, from_task, to_task)]
                ]
            )
            if from_task != 0 and to_task != 0:
                model.add(
                    variables_task_ends[from_task] <= variables_task_starts[to_task]
                ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
        for task in tasks:
            arcs.append([
                task, task, ~variables_machine_task_presences[(machine, task)]
            ])
        model.add_circuit(arcs)

    # Solve
    solver = cp_model.CpSolver()
    solver.parameters.num_search_workers = 8
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    return total_time


if __name__ == '__main__':

    num_tasks = [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

    seconds = []

    for i in num_tasks:
        print(i)
        processing_time = model(i)
        seconds.append(processing_time)

    plt.plot(num_tasks, seconds)
    plt.show()

Multi-station scale (intervals)

Source: scheduling/example_03_seq_scale_Mathieu.py

Same benchmark as the previous chapter, but the duration constraint is replaced with new_optional_interval_var plus per-machine add_no_overlap. CP-SAT's scheduling engine propagates on intervals far more effectively than on generic integer constraints, and the speed-up is dramatic: the same hardware that topped out around 12 tasks now handles ~80.

The per-machine circuit is factored into add_circuit_constraints(...) so the main loop stays readable. That helper is the template for every later multi-machine model.

Concepts

Source

# USE INTERVAL !!!

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt


def generate_data(num_tasks):
    tasks = {i+1 for i in range(num_tasks)}
    tasks_0 = tasks.union({0})
    task_to_product = {0: 'dummy'}
    task_to_product.update({x+1: 'A' for x in range(int(num_tasks/2))})
    task_to_product.update({x+1: 'B' for x in range(int(num_tasks/2), int(num_tasks))})
    return tasks, tasks_0, task_to_product


def model(num_tasks):

    model = cp_model.CpModel()

    # 1. Data
    tasks, tasks_0, task_to_product = generate_data(num_tasks)
    max_time = num_tasks
    processing_time = {'dummy': 0, 'A': 1, 'B': 1}
    changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
    machines = {0, 1}
    machines_starting_products = {0: 'A', 1: 'A'}

    X = {
        (m, t1, t2)
        for t1 in tasks_0
        for t2 in tasks_0
        for m in machines
        if t1 != t2
    }

    m_cost = {
        (m, t1, t2): 0
        if task_to_product[t1] == task_to_product[t2] or (
                task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
        )
        else changeover_time[task_to_product[t2]]
        for (m, t1, t2) in X
    }

    # 2. Decision variables
    variables_task_ends = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_task_starts = {
        task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
    }

    variables_machine_task_starts = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_ends = {
        (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_machine_task_presences = {
        (m, t): model.new_bool_var(f"presence_{m}_{t}")
        for t in tasks
        for m in machines
    }
    variables_intervals = {
        (m, t): model.new_optional_interval_var(
            start=variables_machine_task_starts[m, t],
            size=processing_time[task_to_product[t]],
            end=variables_machine_task_ends[m, t],
            is_present=variables_machine_task_presences[m, t],
            name=f"interval_{t}_{m}",
        )
        for t in tasks
        for m in machines
    }

    variables_machine_task_sequence = {
        (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
        for (m, t1, t2) in X
    }


    # 3. Objectives

    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

    total_changeover_time = sum(
        [variables_machine_task_sequence[(m, t1, t2)]*m_cost[(m, t1, t2)] for (m, t1, t2) in X]
    )

    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [variables_task_ends[task] for task in tasks]
    )
    model.minimize(make_span + total_changeover_time)

    # 4. Constraints
    for task in tasks:
        task_candidate_machines = machines
        # find the subset in presence matrix related to this task
        tmp = [
            variables_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]

        # this task is only present in one machine
        model.add_exactly_one(tmp)

        # task level link to machine-task level
        for m in task_candidate_machines:
            model.add(
                variables_task_starts[task] == variables_machine_task_starts[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

            model.add(
                variables_task_ends[task] == variables_machine_task_ends[m, task]
            ).only_enforce_if(variables_machine_task_presences[m, task])

    for machine in machines:
        tmp = {(m, t) for (m, t) in variables_intervals if m == machine}
        intervals = [variables_intervals[x] for x in tmp]
        model.add_no_overlap(intervals)

    # Create circuit constraints
    add_circuit_constraints(
        model,
        machines,
        tasks,
        variables_task_starts,
        variables_task_ends,
        variables_machine_task_presences,
    )

    # Solve
    solver = cp_model.CpSolver()
    solver.parameters.num_search_workers = 8
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    return total_time


def add_circuit_constraints(
    model,
    machines,
    tasks,
    variables_task_starts,
    variables_task_ends,
    variables_machine_task_presences,
):
    for machine in machines:
        arcs = (
            []
        )  # List of all feasible arcs within a machine. Arcs are boolean to specify circuit from node to node
        machine_tasks = tasks

        for node_1, task_1 in enumerate(machine_tasks):
            mt_1 = str(task_1) + "_" + str(machine)
            # Initial arc from the dummy node (0) to a task.
            arcs.append(
                [0, node_1 + 1, model.new_bool_var("first" + "_" + mt_1)]
            )  # if mt_1 follows dummy node 0
            # Final arc from an arc to the dummy node (0).
            arcs.append(
                [node_1 + 1, 0, model.new_bool_var("last" + "_" + mt_1)]
            )  # if dummy node 0 follows mt_1

            # For optional task on machine (i.e other machine choice)
            # Self-looping arc on the node that corresponds to this arc.
            arcs.append(
                [
                    node_1 + 1,
                    node_1 + 1,
                    ~variables_machine_task_presences[(machine, task_1)],
                ]
            )

            for node_2, task_2 in enumerate(machine_tasks):
                if node_1 == node_2:
                    continue
                mt_2 = str(task_2) + "_" + str(machine)
                # Add sequential boolean constraint: mt_2 follows mt_1
                mt2_after_mt1 = model.new_bool_var(f"{mt_2} follows {mt_1}")
                arcs.append([node_1 + 1, node_2 + 1, mt2_after_mt1])

                # We add the reified precedence to link the literal with the
                # times of the two tasks.
                min_distance = 0
                (
                    model.add(
                        variables_task_starts[task_2] >= variables_task_ends[task_1] + min_distance
                    ).only_enforce_if(mt2_after_mt1)
                )
        model.add_circuit(arcs)



if __name__ == '__main__':

    num_tasks = [x+2 for x in range(80)]

    seconds = []

    for i in num_tasks:
        print(i)
        processing_time = model(i)
        seconds.append(processing_time)

    plt.plot(num_tasks, seconds)
    plt.show()

Changeover in constraint

Source: scheduling/example_04_seq_with_changeover_in_constraint.py

There is a subtle bug in the earlier models. We charged changeover cost in the objective, but the schedule itself was free to place the next task right after the previous one - with no physical gap for the changeover to happen. Cost and schedule disagreed.

The fix is to put the changeover into the precedence constraint:

model.add(end[t1] + distance <= start[t2]).only_enforce_if(seq[m, t1, t2])

Now a changeover actually pushes the next task later. The objective can go back to being just the makespan, and the resulting schedule is physically executable.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

'''
M1  A -> A -> A   1+1
M2  A -> B -> B   1+1+1  --> 3
'''

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
for task in tasks_0:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Sequence with intervals

Source: scheduling/example_05_seq_with_intervals.py

Cleanup chapter. Replace the hand-written model.add(end - start == duration) with a proper new_optional_interval_var. Behavior is identical; the model speaks CP-SAT's native scheduling vocabulary instead.

intervals[m, t] = model.new_optional_interval_var(
    start[m, t], processing_time[product_of(t)], end[m, t],
    presence[m, t], name=f"interval_{m}_{t}",
)

A small refactor with big downstream payoff: intervals plug directly into add_no_overlap, add_cumulative, and the break machinery you will see next.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Sequence with shared resource

Source: scheduling/example_06_seq_with_intervals_resource.py

Two machines, one operator. Even though the machines themselves could run in parallel, the human can only be in one place at a time.

Now that tasks are intervals, one line expresses the constraint:

intervals = list(variables_machine_task_intervals.values())
model.add_cumulative(intervals, [1] * len(intervals), 1)

With capacity 1 across all machine intervals, CP-SAT stops issuing parallel schedules. This is the first time we see add_cumulative doing real work, and it is the pattern that all later resource and break constraints build on.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A', 4: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}
machines = {0, 1}
machines_starting_products = {0: 'A', 1: 'B'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}




# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

# This includes task 0
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Constraints

# Duration - This can be replaced by interval variable ?
# for task in tasks_0:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

# One task to one machine. Link across level.
for task in tasks:

    # For this task

    # get all allowed machines
    task_candidate_machines = machines

    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]

    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies
model.add(variables_task_starts[0] == 0)
# model.add(variables_task_ends[0] == 0)
# variables_machine_task_starts
# variables_machine_task_ends
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:

    arcs = list()

    for from_task in tasks_0:

        for to_task in tasks_0:

            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])

                distance = m_cost[m, from_task, to_task]

                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:

                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])

    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])

    model.add_circuit(arcs)


# Add resource constraint that there is only one people so no parallel task expected

intervals = list(variables_machine_task_intervals.values())

model.add_cumulative(intervals, [1]*len(intervals), 1)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Changeover as event

Source: scheduling/example_08_changeover_as_event.py

The two earlier changeover approaches - cost in objective, gap in precedence - both treat the changeover as a number. This chapter promotes it to a first-class scheduled event: every ordered pair (t1, t2) gets its own optional interval with presence, start, end, and duration.

co_iv[m, t1, t2] = model.new_optional_interval_var(
    co_start[m, t1, t2],
    changeover_time[product_of(t2)],
    co_end[m, t1, t2],
    co_present[m, t1, t2],
    ...
)

When seq[m, t1, t2] is chosen, the model forces the interval to be present, sit between the two tasks, and have the right size:

model.add(end[t1] <= co_start[t1, t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end[t1, t2] <= start[t2]).only_enforce_if(seq[m, t1, t2])
model.add(co_end - co_start == distance).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 1).only_enforce_if(seq[m, t1, t2])
model.add(co_present == 0).only_enforce_if(~seq[m, t1, t2])

Why bother with all this? Because once the changeover is an interval, it can live inside add_cumulative just like a task. Need a cleaner that can only do one changeover at a time across multiple machines? Put the co-intervals under a shared resource. Not possible with the simpler formulations.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       B
'''

# 1. Data

tasks = {1, 2}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'B'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 2, 'B': 2}
machines = {0}
machines_starting_products = {0: 'A'}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# This is not yet used
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
# ! This can replace the end - start = duration constrain
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"t_interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# Add change over-related variables !!!

variables_co_starts = {
    (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_co_ends = {
    (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end") for t1 in tasks_0 for t2 in tasks_0 if t1 != t2
}
variables_machine_co_starts = {
    (m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_start")
    for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_ends = {
    (m, t1, t2): model.new_int_var(0, max_time, f"m{m}_co_t{t1}_to_t{t2}_end")
    for t1 in tasks_0 for t2 in tasks_0 for m in machines if t1 != t2
}
variables_machine_co_presences = {
    (m, t1, t2): model.new_bool_var(f"co_presence_m{m}_t{t1}_t{t2}")
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}


variables_machine_co_intervals = {
    (m, t1, t2): model.new_optional_interval_var(
        variables_machine_co_starts[m, t1, t2],
        changeover_time[task_to_product[t2]],
        variables_machine_co_ends[m, t1, t2],
        variables_machine_co_presences[m, t1, t2],
        name=f"co_interval_m{m}_t{t1}_t{t2}"
    )
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}







# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# One task to one machine.
for task in tasks:
    task_candidate_machines = machines
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# co level link to machine-co level
for t1 in tasks_0:
    for t2 in tasks_0:
        if t1 != t2:

            for m in machines:
                model.add(
                    variables_co_starts[t1, t2] == variables_machine_co_starts[m, t1, t2]
                ).only_enforce_if(variables_machine_co_presences[m, t1, t2])

                model.add(
                    variables_co_ends[t1, t2] == variables_machine_co_ends[m, t1, t2]
                ).only_enforce_if(variables_machine_co_presences[m, t1, t2])


# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:
    arcs = list()
    for t1 in tasks_0:
        for t2 in tasks_0:
            # arcs
            if t1 != t2:
                arcs.append([
                        t1,
                        t2,
                        variables_machine_task_sequence[(m, t1, t2)]
                ])
                distance = m_cost[m, t1, t2]
                # cannot require the time index of task 0 to represent the first and the last position
                if t2 != 0:
                    # to schedule tasks and c/o
                    model.add(
                        variables_task_ends[t1] <= variables_co_starts[t1, t2]
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_co_ends[t1, t2] <= variables_task_starts[t2]
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    # ensure intervals are consistent so we can apply resource constraints later
                    model.add(
                        variables_machine_co_presences[m, t1, t2] == 1
                    ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

                    model.add(
                        variables_machine_co_presences[m, t1, t2] == 0
                    ).only_enforce_if(~variables_machine_task_sequence[(m, t1, t2)])


    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])
    model.add_circuit(arcs)


# Solve


solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                        print('variables_machine_task_sequence[t1, t2]', solver.value(variables_machine_task_sequence[m, t1, t2]))
                        print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
                        print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
                        print('variables_machine_co_presences[m, t1, t2]', solver.value(variables_machine_co_presences[m, t1, t2]))
                        print('variables_machine_co_starts[m, t1, t2]', solver.value(variables_machine_co_starts[m, t1, t2]))
                        print('variables_machine_co_ends[m, t1, t2]', solver.value(variables_machine_co_ends[m, t1, t2]))
                        #print('variables_machine_co_intervals[m, t1, t2]', variables_machine_co_intervals[m, t1, t2])

                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Break without changeover

Source: scheduling/example_07_break_without_changeover.py

Time to introduce breaks. Single machine, four identical tasks, and a fixed break window at (2, 3) during which the machine is unavailable.

The trick is to treat the break as just another interval. A new_fixed_size_interval_var is placed at the break window and dropped into an add_cumulative(intervals, capacity=1) alongside the task intervals. Tasks have nowhere to go during the break and spread around it.

Because all tasks share one product, no changeover logic is needed - a deliberate simplification to isolate the break mechanic. Two add_decision_strategy calls are thrown in to force the solver to emit the canonical order 0 1 2 3 4 0; without them it returns a correct but oddly permuted sequence.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
3       A
4       A
'''

# 1. Data

tasks = {1, 2, 3, 4}
tasks_0 = tasks.union({0})
task_to_product = {0: 'dummy', 1: 'A', 2: 'A', 3: 'A', 4: 'A'}
processing_time = {'dummy': 0, 'A': 1}
changeover_time = {'dummy': 0, 'A': 1}
machines = {0}
machines_starting_products = {0: 'A'}
breaks = {(2, 3)}

X = {
    (m, t1, t2)
    for t1 in tasks_0
    for t2 in tasks_0
    for m in machines
    if t1 != t2
}

# Now this used in constraints, not in objective function anymore
m_cost = {
    (m, t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or (
            task_to_product[t1] == 'dummy' and task_to_product[t2] == machines_starting_products[m]
    )
    else changeover_time[task_to_product[t2]]
    for (m, t1, t2) in X
}


# 2. Decision variables
max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks_0
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks_0
    for m in machines
}

variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for (m, t1, t2) in X
}

# intervals
variables_machine_task_intervals = {
    (m, task): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        processing_time[task_to_product[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_presences[m, task],
        name=f"interval_{m}_{task}"
    )
    for task in tasks_0
    for m in machines
}


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# One task to one machine.
for task in tasks:
    # For this task
    # get all allowed machines
    task_candidate_machines = machines
    # find the subset in presence matrix related to this task
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)


# task level link to machine-task level
for task in tasks_0:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for dummies: Force task 0 (dummy) starts at 0 and is present on all machines
model.add(variables_task_starts[0] == 0)
for m in machines:
    model.add(variables_machine_task_presences[m, 0] == 1)


# Sequence
for m in machines:
    arcs = list()
    for from_task in tasks_0:
        for to_task in tasks_0:
            # arcs
            if from_task != to_task:
                arcs.append([
                        from_task,
                        to_task,
                        variables_machine_task_sequence[(m, from_task, to_task)]
                ])
                distance = m_cost[m, from_task, to_task]
                # cannot require the time index of task 0 to represent the first and the last position
                if to_task != 0:
                    model.add(
                        variables_task_ends[from_task] + distance <= variables_task_starts[to_task]
                    ).only_enforce_if(variables_machine_task_sequence[(m, from_task, to_task)])
    for task in tasks:
        arcs.append([
            task, task, ~variables_machine_task_presences[(m, task)]
        ])
    model.add_circuit(arcs)



# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=1, name='a_break') for (start, end) in breaks
}

# Add resource control with break
intervals = list(variables_machine_task_intervals.values()) + list(variables_breaks.values())
model.add_cumulative(intervals, [1]*len(intervals), 1)



# Solve


# https://github.com/d-krupke/cpsat-primer

# default
# 0 1 4 2 3 0

#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
# 0 4 3 2 1 0

#model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0

# strange
#model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
# 0 1 4 2 3 0

# Need the following both to have the right sequence
model.add_decision_strategy(variables_task_starts.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
model.add_decision_strategy(variables_machine_task_sequence.values(), cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)


# 0 1 2 3 4 0


solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    print('Make-span:', solver.value(make_span))

    for m in machines:

        print(f'------------\nMachine {m}')
        print(f'Starting dummy product: {machines_starting_products[m]}')
        for t1 in tasks_0:
            for t2 in tasks_0:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

One automatic job

Source: scheduling/example_12_an_automatic_job.py

Machines differ in how they consume operator time. Some need the operator for the whole run; others only for setup, after which the machine runs itself and the operator can go handle breaks or other jobs.

The model uses two intervals per task. The full interval is what shows up on the Gantt chart. The setup interval - a size-1 stub at the task's start - is what goes into the cumulative alongside breaks:

model.add_cumulative(
    intervals=setup_intervals + break_intervals,
    demands=[1] * (len(tasks) + len(breaks)),
    capacity=1,
)

Now breaks can push task starts, but once an automatic task has begun it runs through the break uninterrupted.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_3
'''

# 1. Data

tasks = {1}
products = {'A', 'B'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_3'}
processing_time = {'A': 3, 'B': 1}
max_time = 10
breaks = {(0, 1), (2, 10)}


# 2. Decision Variables
var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}

var_task_intervals_autojobs = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_3'
}



# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_autojobs.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1] + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Automatic jobs

Source: scheduling/example_13_automatic_jobs.py

Two automatic jobs need an order. The two-interval trick from the previous chapter carries over: each task has a full interval and a size-1 setup interval.

A small add_circuit with seq[t1, t2] booleans picks the order; each selected arc enforces end[t1] <= start[t2]. The cumulative still uses only the setup intervals and the breaks, so once a task is running no break can interrupt it.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_3
2       B           TYPE_3
'''

# 1. Data

tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'B'}
task_to_type = {1: 'TYPE_3', 2: 'TYPE_3'}
processing_time = {'A': 4, 'B': 3}
max_time = 10
breaks = {(0, 1), (2, 3), (4, 6), (7, 10)}


# 2. Decision Variables

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}

var_task_intervals_auto = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_auto_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_3'
}

var_task_seq = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}


arcs = []
for t1 in tasks:
    tmp_1 = model.new_bool_var(f'first_to_{t1}')
    arcs.append([0, t1, tmp_1])

    tmp_2 = model.new_bool_var(f'{t1}_to_last')
    arcs.append([t1, 0, tmp_2])

    for t2 in tasks:
        if t1 == t2:
            continue

        tmp_3 = model.new_bool_var(f'{t1}_to_{t2}')
        arcs.append([t1, t2, var_task_seq[t1, t2]])

        model.add(
            var_task_ends[t1] <= var_task_starts[t2]
        ).only_enforce_if(var_task_seq[t1, t2])

model.add_circuit(arcs)


# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))
    print('=======================  ALLOCATION & SEQUENCE  =======================')
    if True:
        for t1 in tasks:
            for t2 in tasks:
                if t1 != t2:
                    value = solver.value(var_task_seq[(t1, t2)])
                    print(f'{t1} --> {t2}  {value}')
                    # if value == 1 and t2 != 0:
                    #     print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
                    # if value == 1 and t2 == 0:
                    #     print(f'{t1} --> {t2}   Closing')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Task delaying a break

Source: scheduling/example_14_task_delaying_break.py

A different way of mixing tasks and breaks. Here the task is not automatic; if it runs through a break, its effective machine time grows by the length of the break.

The encoding is per time slot. For every slot i, a boolean uses[t, i] fires iff task t covers slot i. Built as the AND of "started by i" and "still running after i", it becomes one via add_multiplication_equality. Then

duration[t] = base + sum(is_break[i] * uses[t, i] for i)

adds back the length of every break the task straddles. The task interval uses this stretched duration directly.

Heavy on per-slot booleans, but it is the canonical pattern whenever a duration depends on a time-dependent condition.

Concepts

  • Breaks (approach 2: duration stretched)
  • CP-SAT basics (reification, add_multiplication_equality)

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_4
2       B           TYPE_4
'''

# 1. Data

tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}

# 2. Decision Variables

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_new_duration = {
    task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}

var_task_duration_timeslots = {
    (task, i): model.new_bool_var(f'task {task} uses interval {i}')
    for task in tasks
    for i in range(max_time)
}

## four
# AND pair
var_task_start_earlier_than_start = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
var_task_end_later_than_end = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }


for task in tasks:
    for i in range(max_time):
        model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
        model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])

        model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
        model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])

        # model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
        # model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
        #
        # model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
        # model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])

        # And
        model.add_multiplication_equality(
            var_task_duration_timeslots[task, i],
            [var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
        )

        # model.add_min_equality(
        #     var_task_duration_timeslots[task, i],
        #     [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
        # )

# for task in tasks:
#     for i in range(max_time):
#         start_index = i
#         end_index = i + 1
#
#         # TRUE
#         model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
#         model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
#         # FALSE
#         if i == 0:
#             # first time slot
#             model.add_linear_expression_in_domain(
#             var_task_starts[task],
#             cp_model.Domain.from_intervals([[1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#
#         elif i == max_time - 1:
#             # last time slot
#             model.add_linear_expression_in_domain(
#             var_task_ends[task],
#             cp_model.Domain.from_intervals([[0, max_time-1]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#         else:
#             model.add_linear_expression_in_domain(
#             x,
#             cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])


#
# task_1_start  <  task_2_start <  task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:


for task in tasks:
    model.add(
            var_task_new_duration[task] == processing_time[task_to_product[task]] +
            sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
    )


var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        var_task_new_duration[task],
        #processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}


var_task_intervals_auto = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_auto_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_4'
}

# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)




# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    for task in tasks:
        print(f'task {task}')
        for i in range(max_time):
            print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )
        print(solver.value(var_task_new_duration[task]))

    print('Make-span:', solver.value(make_span))
    # print('=======================  ALLOCATION & SEQUENCE  =======================')
    # if True:
    #     for t1 in tasks:
    #         for t2 in tasks:
    #             if t1 != t2:
    #                 value = solver.value(var_task_seq[(t1, t2)])
    #                 print(f'{t1} --> {t2}  {value}')
    #                 # if value == 1 and t2 != 0:
    #                 #     print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
    #                 # if value == 1 and t2 == 0:
    #                 #     print(f'{t1} --> {t2}   Closing')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Linear domain for breaks

Source: scheduling/example_29_linear_domain_for_breaks.py

When breaks are periodic - a fixed pattern every shift, every day - modeling each break as an interval inflates the model. An alternative is to precompute the set of valid task start times (every position that doesn't overlap a break) and restrict starts to that set with add_linear_expression_in_domain.

The file defines two model variants and benchmarks them:

  • Method 1: breaks as fixed intervals in add_cumulative or add_no_overlap.
  • Method 2: breaks disappear; start domains are constrained.

For highly periodic schedules the domain method wins: CP-SAT propagates on a small set of start values directly and avoids all the break intervals. The trade-off is loss of generality - arbitrary break patterns don't translate to clean domains.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from tqdm import tqdm
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2


# method 1: No overlapping
def run_model_1(number_of_tasks):
    #number_of_tasks = 200
    tasks = {x for x in range(number_of_tasks)}

    # 2 tasks --> 8 time units

    processing_time = 2
    max_time = number_of_tasks * 4 + 100

    breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}

    valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
    valid_start_times = [item for sublist in valid_start_times for item in sublist]

    model = cp_model.CpModel()

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            processing_time,
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] < var_task_starts[task])

    var_break_intervals = {
        break_id: model.new_fixed_size_interval_var(
            break_start, break_end-break_start, f"break_{break_id}"
        ) for break_id, (break_start, break_end) in enumerate(breaks)
    }

    all_intervals = [x for x in var_task_intervals.values()] + [x for x in var_break_intervals.values()]
    model.add_no_overlap(all_intervals)

    # obj
    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    #     for task in tasks:
    #         print(f'Task {task} ',
    #               solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
    #               )
    #
    #     print('Make-span:', solver.value(make_span))

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


# method 2: Use linear domain
def run_model_2(number_of_tasks):

    #number_of_tasks = 4
    tasks = {x for x in range(number_of_tasks)}

    # 2 tasks --> 8 time units

    processing_time = 2
    max_time = number_of_tasks * 4 + 100

    breaks = {(4 * i, 4 * i + 4) for i in range(int(number_of_tasks)) if i % 2 != 0}

    valid_start_times = [[0 + i * 8, 1 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]
    valid_periods = [[0 + i * 8, 2 + i * 8] for i in range(int(number_of_tasks / 2))]

    valid_start_times = [item for sublist in valid_start_times for item in sublist]

    model = cp_model.CpModel()

    #model.new_int_varFromDomain(cp_model.Domain.from_intervals([[1, 2], [4, 6]]), 'x')

    var_task_starts = {task: model.new_int_varFromDomain(
        cp_model.Domain.from_intervals(valid_periods),
        f"task_{task}_start")
        for task in tasks}

    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            processing_time,
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] < var_task_starts[task])

    all_intervals = [x for x in var_task_intervals.values()]
    model.add_no_overlap(all_intervals)

    # obj
    make_span = model.new_int_var(0, max_time, "make_span")

    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    #     for task in tasks:
    #         print(f'Task {task} ',
    #               solver.value(var_task_starts[task]), solver.value(var_task_ends[task])
    #               )
    #
    #     print('Make-span:', solver.value(make_span))

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    N = 200

    sizes_1 = list(range(2, N, 10))
    sizes_2 = list(range(2, N, 10))

    model_1_times = []
    model_2_times = []

    print("\nNoOverlap\n")
    for size in tqdm(sizes_1):
        model_1_times.append(run_model_1(size))

    print("\nLinear Domain\n")
    for size in tqdm(sizes_2):
        model_2_times.append(run_model_2(size))

    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(sizes_1, model_1_times, marker='o', label='NoOverLapping')
    plt.plot(sizes_2, model_2_times, '-.', marker='o', label='Linear Domain')
    plt.legend()
    plt.title('Performance benchmarking')
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()

Conditional duration via linear domain

Source: scheduling/example_33_conditional_duration_linear_domain.py

Combines the ideas of two earlier chapters. Like Task delaying a break, a task that hits a break takes longer. Like Linear domain for breaks, the break pattern is periodic enough to encode as start-time domains.

Two start domains are precomputed: domain_break (starts that would overlap a break) and domain_no_break (safe starts). A reified bool overlap_break[t] toggles which domain the start belongs to, and the duration switches by one time unit accordingly:

model.add(duration[t] == processing_time + 1).only_enforce_if(overlap_break[t])
model.add(duration[t] == processing_time    ).only_enforce_if(~overlap_break[t])

Because the task interval is built from start/duration/end, everything downstream - no-overlap, cumulative, makespan - sees the correct effective duration with no extra bookkeeping.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
import pandas as pd


if __name__ == '__main__':
    """
    Offset = 0:
        | x |   |   | x |   |   | x |   |   |
    Offset = 1:
        |   | x |   |   | x |   |   | x |   |    
    Offset = 2:
        |   |   | x |   |   | x |   |   | x |    
        
    where x represent a unit duration break period
    
    """
    break_offset = 1

    num_of_tasks = 3
    max_time = num_of_tasks*3
    processing_time = 2

    if break_offset == 0:
        help_text = "| x |   |   | x |   |   | x |   |   |"
    elif break_offset == 1:
        help_text = "|   | x |   |   | x |   |   | x |   |"
    elif break_offset == 2:
        help_text = "|   |   | x |   |   | x |   |   | x |"
    else:
        print("offset wrong")
        exit()

    breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
    tasks = {x for x in range(num_of_tasks)}
    starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
    starts_break = list(set(range(max_time)).difference(starts_no_break))
    domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
    domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])

    model = cp_model.CpModel()

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
    var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}

    # print("Heuristic 1: Apply the tasks sequence heuristics")
    for task in tasks:
        if task == 0:
            continue
        model.add(var_task_ends[task-1] <= var_task_starts[task])

    for task in tasks:

        model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
            var_task_overlap_break[task]
        )

        model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
            ~var_task_overlap_break[task]
        )

        model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
            var_task_overlap_break[task]
        )

        model.add(var_task_durations[task] == processing_time).only_enforce_if(
            ~var_task_overlap_break[task]
        )

    # intervals
    var_intervals = {
        task: model.new_interval_var(
            var_task_starts[task],
            var_task_durations[task],
            var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }

    model.add_no_overlap(var_intervals.values())

    # Objectives
    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(
        make_span,
        [var_task_ends[task] for task in tasks]
    )

    model.minimize(make_span + sum(var_task_durations.values()))
    # model.minimize(sum(var_task_durations))

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    print_result = True
    # show the result if getting the optimal one
    if print_result:
        print("-----------------------------------------")
        print(help_text)
        print("breaks periods:", breaks)
        print("break starts:", starts_break)
        print("no break starts:", starts_no_break)

        if status == cp_model.OPTIMAL:
            big_list = []
            for task in tasks:
                tmp = [
                    f"task {task}",
                    solver.value(var_task_starts[task]),
                    solver.value(var_task_ends[task]),
                    solver.value(var_task_overlap_break[task]),
                    solver.value(var_task_durations[task]),
                ]
                big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
            df = df.sort_values(['start'])
            print(df)
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

Shift crossing (fake time unit)

Source: scheduling/example_16_shift_crossing_fake_time_unit.py

The working day is split into shifts. A task is allowed to run entirely inside one shift, but not straddle a boundary.

The first approach borrows the break mechanic. Place a one-unit "fake break" interval at each shift boundary, then forbid every task from overlapping it:

for s, e in synthetic_shift_breaks:
    br = model.new_fixed_size_interval_var(start=s, size=e - s, name="shift_edge")
    for t in tasks:
        model.add_no_overlap([task_interval[t], br])

Concise and scales well. Real breaks continue to use add_cumulative. Compare with the explicit shift-assignment alternative next.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

# 1. Data

synthetic_shift_breaks = {(4, 5), (9, 10)}
breaks = {(0, 2)}
tasks = {1}
processing_time = {1: 3}
max_time = 10

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task], processing_time[task], var_task_ends[task], name=f"interval_t{task}"
    ) for task in tasks
}

# Add break time
var_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())

demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# THE CONSTRAINT for synthetic shift break time unit


var_shift_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in synthetic_shift_breaks
}

for start, end in synthetic_shift_breaks:
    print(start, end)
    for task in tasks:
        model.add_no_overlap([var_task_intervals[task], var_shift_break_intervals[start, end]])



# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Shift crossing (Mathieu)

Source: scheduling/example_17_shift_crossing_mathieu.py

Second take on the same shift problem. Instead of a synthetic break at every boundary, assign each task explicitly to one shift via a one-hot presence[shift, task]. When that presence is true, the task must stay inside the shift window.

for t in tasks:
    model.add_exactly_one(presence[s, t] for s in shifts)
    for s in shifts:
        model.add(start[t] >= shift_start[s]).only_enforce_if(presence[s, t])
        model.add(end[t]   <= shift_end[s]  ).only_enforce_if(presence[s, t])

More variables than the synthetic-break approach, but the assignment is visible in the solution and easy to extend - per-shift headcount caps, operator preferences, shift-specific processing times all drop in naturally.

Concepts

  • Shifts (explicit shift assignment)

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

tasks = {1}
processing_times = {1: 3}
max_time = 10
breaks = {(0, 2)}
shifts = {1, 2}
shift_starts = {1:0, 2:4}
shift_ends = {1:4, 2:8}


var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_shift_task_presence = {
    (shift, task): model.new_bool_var(f'task_{task}_is_in_shift_{shift}')
    for shift in shifts for task in tasks
}

for task in tasks:
    model.add(sum(var_shift_task_presence[shift, task] for shift in shifts) == 1 )

    for shift in shifts:
        model.add(var_task_starts[task] >= shift_starts[shift]).only_enforce_if(
            var_shift_task_presence[shift, task]
        )
        model.add(var_task_ends[task] <= shift_ends[shift]).only_enforce_if(
            var_shift_task_presence[shift, task]
        )


var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task], processing_times[task], var_task_ends[task], name=f"interval_t{task}"
    ) for task in tasks
}

# Add break time
var_break_intervals = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}


intervals = list(var_task_intervals.values()) + list(var_break_intervals.values())

demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)


# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Stages, one job

Source: scheduling/example_21_stages_one_job.py

A job can consist of ordered stages. Think of a bakery batch: prepare, bake, pack. Each stage uses its own equipment and must follow the previous one.

Model: tasks are indexed (job, stage). The job's start and end are the min and max over its stage starts and ends:

model.add_min_equality(job_start[job], [start[job, s] for s in stages])
model.add_max_equality(job_end[job],   [end  [job, s] for s in stages])

Stage precedence is a chain of end[s] <= start[s + 1]. With only one job, the per-stage add_no_overlap is a no-op here, but the scaffolding is ready for the next chapter.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 10

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}


# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Stages, two jobs

Source: scheduling/example_22_stages_two_jobs.py

Add a second job and the previous chapter's scaffolding earns its keep. Each stage has only one instance of its equipment, so two jobs cannot sit in the same stage at the same time:

for s in stages:
    model.add_no_overlap([intervals[j, s] for j in jobs])

This is the classic flow-shop pattern. The solver now has to interleave the two jobs across stages such that no stage is double-booked and the makespan is minimised.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1, 2}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 20

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}

for stage in stages:
    model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)

# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Multistage, two jobs

Source: scheduling/example_23_multistage_two_jobs_co.py

Same flow-shop model scaled to 6 jobs over 3 stages. No new mechanics - stage precedence, per-stage no-overlap, makespan objective - just more of everything. It exists to check how CP-SAT handles a realistic-sized flow-shop instance.

Despite the co suffix in the filename, no explicit changeover is added; treat it as a scalability exercise.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

jobs = {1, 2, 3, 4, 5, 6}
stages = {1, 2, 3}
tasks = {(job, stage) for job in jobs for stage in stages}

processing_time = 3
max_time = 100

# 1. Jobs
var_job_starts = {
    job: model.new_int_var(0, max_time, f"job_{job}_start") for job in jobs
}

var_job_ends = {
    job: model.new_int_var(0, max_time, f"job_{job}_end") for job in jobs
}

# 2. Tasks
var_task_starts = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_start") for (job, stage) in tasks
}

var_task_ends = {
    (job, stage): model.new_int_var(0, max_time, f"job_{job}_stage_{stage}_end") for (job, stage) in tasks
}


for job in jobs:
    model.add_min_equality(var_job_starts[job], [var_task_starts[job, stage] for stage in stages])
    model.add_max_equality(var_job_ends[job], [var_task_ends[job, stage] for stage in stages])

    for stage in stages:
        if stage == len(stages):
            continue
        model.add(var_task_ends[job, stage] <= var_task_starts[job, stage + 1])


var_task_intervals = {
    (job, stage): model.new_interval_var(
        var_task_starts[job, stage],
        processing_time,
        var_task_ends[job, stage],
        name=f"interval_job_{job}_stage_{stage}"
    )
    for job in jobs
    for stage in stages
}

for stage in stages:
    model.add_no_overlap(var_task_intervals[job, stage] for job in jobs)

# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)
model.minimize(make_span)


# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    print('===========================  TASKS SUMMARY  ===========================')
    for job in jobs:
        print(f"job_{job}  start: {solver.value(var_job_starts[job])}   end:{solver.value(var_job_ends[job])}")
        for stage in stages:
            print(f'Stage {stage} ',
                  solver.value(var_task_starts[job, stage]), solver.value(var_task_ends[job, stage]),
                  )

    print('Make-span:', solver.value(make_span))

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

People mode

Source: scheduling/example_10_people_mode.py

Not every task has a fixed resource profile. A common pattern: you can run a task with 2 people for 3 time units, or with 3 people for 2. Different modes, different (duration, headcount) pairs, same outcome.

The solver needs to pick the mode. A one-hot mode[t, k] per task drives the derivation:

model.add(
    proc_time[t] == sum(
        processing_time[product[t], k] * mode[t, k] for k in modes
    )
)

With proc_time[t] now a variable rather than a constant, the rest of the sequencing machinery (machine presence, per-machine add_circuit, task-level links) carries over unchanged.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
'''

### 1. Data

tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'A'}
machines = {1,2}
resource_modes = {1, 2}

# product -> people mode -> time duration
processing_time = {
    ('A', 1): 3,
    ('A', 2): 2
}
resource_requirement = {
    ('A', 1): 2,
    ('A', 2): 3
}


max_time = 20

# (task, people_mode)

# 2. Decision variables

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

variables_machine_task_presences = {
    (m, t): model.new_bool_var(f"presence_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_starts = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}

variables_machine_task_ends = {
    (m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
    for t in tasks
    for m in machines
}

# One task to one machine.
for task in tasks:
    task_candidate_machines = machines
    tmp = [
        variables_machine_task_presences[m, task]
        for m in task_candidate_machines
    ]
    # this task is only present in one machine
    model.add_exactly_one(tmp)

# task level link to machine-task level
for task in tasks:
    task_candidate_machines = machines
    for m in task_candidate_machines:
        model.add(
            variables_task_starts[task] == variables_machine_task_starts[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])

        model.add(
            variables_task_ends[task] == variables_machine_task_ends[m, task]
        ).only_enforce_if(variables_machine_task_presences[m, task])


# for sequence control
variables_machine_task_sequence = {
    (m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
    for t1 in tasks
    for t2 in tasks
    for m in machines
    if t1 != t2
}

# Sequence
for m in machines:
    arcs = list()
    for node1, t1 in enumerate(tasks):
        tmp_1 = model.new_bool_var(f'm_{m}_first_to_{t1}')
        arcs.append([0, t1, tmp_1])

        tmp_2 = model.new_bool_var(f'm_{m}_{t1}_to_last')
        arcs.append([t1, 0, tmp_2])

        arcs.append([t1, t1, ~variables_machine_task_presences[m, t1]])

        for node_2, t2 in enumerate(tasks):
            # arcs
            if t1 == t2:
                continue

            arcs.append([
                t1,
                t2,
                variables_machine_task_sequence[(m, t1, t2)]
            ])

            distance = 0

            # cannot require the time index of task 0 to represent the first and the last position
            model.add(
                variables_task_ends[t1] + distance <= variables_task_starts[t2]
            ).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])

    model.add_circuit(arcs)

#
variables_task_resource_mode = {
    (task, resource_mode): model.new_bool_var(f"task {task} using resource mode {resource_mode}")
    for task in tasks for resource_mode in resource_modes
}

# variables_task_resource_modes = {
#     task: model.new_int_var(0, len(resource_modes), f"the resource mode of {task}")
#     for task in tasks
# }

# one task has to pick one resource mode
for task in tasks:
    tmp = sum(variables_task_resource_mode[task, resource_mode] for resource_mode in resource_modes)
    model.add(tmp == 1)

# link resource id with decision matrix
# for task in tasks:
#     for resource_mode in resource_modes:
#         model.add(
#             variables_task_resource_modes[task] == resource_mode
#         ).only_enforce_if(
#             variables_task_resource_mode_matrix[task, resource_mode]
#         )

variables_task_processing_time = {
    task: model.new_int_var(0, 99, f"processing time for {task}")
    for task in tasks
}

for task in tasks:
    model.add(
        variables_task_processing_time[task] == sum(
            processing_time[task_to_product[task], resource_mode]*variables_task_resource_mode[task, resource_mode]
            for resource_mode in resource_modes
        )
    )

# model.new_optional_interval_var(1,1,2,1,'')
# model.new_optional_interval_var(
#     model.new_int_var(0,1,''),
#     model.new_int_var(0, 1, ''),
#     model.new_int_var(0,1,''),
#     model.new_int_var(0, 1, ''),
#     '')

# variables_machine_task_intervals = {
#     (m, task): model.new_optional_interval_var(
#         variables_machine_task_starts[m, task],
#         #2,
#         variables_task_processing_time[task],
#         #processing_time[task_to_product[task], variables_task_resource_modes[task]],
#         variables_machine_task_ends[m, task],
#         variables_machine_task_presences[m, task],
#         name=f"t_interval_{m}_{task}"
#     )
#     for task in tasks
#     for m in machines
# }

variables_machine_task_resource_mode_presence = {
    (m, task, resource_mode): model.new_bool_var(f'm{m}_task_{task}_resource_mode_{resource_mode}_presence')
    for m in machines
    for task in tasks
    for resource_mode in resource_modes
}

for task in tasks:
    for resource_mode in resource_modes:
        model.add(
            sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for m in machines) == 1
        ).only_enforce_if(
            variables_task_resource_mode[task, resource_mode]
        )

for task in tasks:
    for m in machines:
        model.add(
            sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for resource_mode in resource_modes) == 1
        ).only_enforce_if(
            variables_machine_task_presences[m, task]
        )

for task in tasks:
    model.add(
        sum(variables_machine_task_resource_mode_presence[m, task, resource_mode]
            for resource_mode in resource_modes
            for m in machines
            ) == 1
    )


variables_machine_task_mode_intervals = {
    (m, task, resource_mode): model.new_optional_interval_var(
        variables_machine_task_starts[m, task],
        #2,
        variables_task_processing_time[task],
        #processing_time[task_to_product[task], variables_task_resource_modes[task]],
        variables_machine_task_ends[m, task],
        variables_machine_task_resource_mode_presence[m, task, resource_mode],
        #variables_machine_task_presences[m, task]*variables_task_resource_mode[task, resource_mode],
        name=f"int_m{m}_t{task}_mode{resource_mode}"
    )
    for task in tasks
    for m in machines
    for resource_mode in resource_modes
}




intervals = [x for x in variables_machine_task_mode_intervals.values()]
demands = [
    resource_requirement[task_to_product[task], resource_mode]
    for machine, task, resource_mode in variables_machine_task_mode_intervals.keys()
]
model.add_cumulative(intervals=intervals, demands=demands, capacity=7)

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# for m in machines:
#     for task in tasks:
#         for resource_mode in resource_modes:
#             print(f'{m} {task} {resource_mode}')
#             print(solver.value(variables_machine_task_mode_intervals[m, task, resource_mode]))

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    for m in machines:
        for task in tasks:
            for resource_mode in resource_modes:
                print(f'M {m} T {task} R {resource_mode}' ,end = ' ')
                print(f'  {solver.value(variables_machine_task_resource_mode_presence[m, task, resource_mode])}')


    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task]), end='   '
              )
        for resource_mode in resource_modes:
            value = solver.value(variables_task_resource_mode[task, resource_mode])
            if value == 1:
                print(f'Using resource mode {resource_mode}')
    print('Make-span:', solver.value(make_span))
    print('=======================  ALLOCATION & SEQUENCE  =======================')
    for m in machines:

        print(f'------------\nMachine {m}')
        for t1 in tasks:
            value = solver.value(variables_machine_task_presences[(m, t1)])
            if value == 1:
                print(f'task_{t1}_on machine_{m}')
            for t2 in tasks:
                if t1 != t2:
                    value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
                    if value == 1 and t2 != 0:
                        print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
                    if value == 1 and t2 == 0:
                        print(f'{t1} --> {t2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")

Resource mode (empty)

Source: scheduling/example_11_resource_mode.py

The file is empty. It was likely intended as a twin of the previous chapter, modeling a non-human resource (machine speed, tool type) with the same one-hot mode pattern. See People mode for the fully worked version.

Headcount tracking

Source: scheduling/example_34_headcount_tracking.py

The big-picture chapter on resources. add_cumulative works when each interval's demand is a known constant. But if the demand depends on state - the task's mode, whether it overlaps a break, which operator is assigned - you need something more flexible.

Three approaches are benchmarked:

  • Method 0 - native add_cumulative. Fastest when it applies, but assumes fixed durations and demands.
  • Method 1 - per-slot start-presence booleans. var_task_starts_presence[t, i] fires when task t starts at slot i. A "did a task starting within the last d slots cover this slot?" indicator (built with add_max_equality) lets the duration vary per task. Per-time resource variables are the sum.
  • Method 2 - per-slot overlap booleans. The same information encoded via start <= t < end booleans for every task-slot pair.

Methods 1 and 2 trade model size for flexibility; the chapter also ships a small Variables dataclass and extract_solution(...) helper that keeps the boilerplate manageable.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
import pandas as pd
from dataclasses import dataclass
from typing import Dict, Set
import copy
import logging
logger = logging.getLogger(__name__)


@dataclass
class Variables:
    """
    All variables of the optimization problem, used for decisions and result values
    Type `Any` shorthand for a solver variable object, or a result float/int
    """
    var_task_starts: Dict[str, any] = None
    var_task_ends: Dict[str, any] = None
    var_task_durations: Dict[str, any] = None
    var_task_overlap_break: Dict[str, any] = None
    var_intervals: Dict[str, any] = None
    total_duration: Dict[str, any] = None
    make_span: Dict[int, any] = None


def extract_solution(solver: cp_model.CpSolver, variables):
    """Extracts the solution from the variables
    Three kinds of variables have been considered
    Integer, Boolean and Interval Variable
    Incase of other types of variables,
    this function would require modification
    """

    # Initializing solution class
    solution = copy.deepcopy(variables)
    for varname, vardict in variables.__dict__.items():
        setattr(solution, varname, None)

    # Assigning variable values
    for varname, vardict in variables.__dict__.items():
        if vardict is not None:
            setattr(
                solution,
                varname,
                {
                    k: solver.value(v) if type(v) not in [cp_model.IntervalVar] else v
                    for k, v in vardict.items()
                },
            )
        else:
            logger.warning(f"Variable '{varname}' is defined but not used in model")
    return solution



if __name__ == '__main__':
    """
    Offset = 0:
        | x |   |   | x |   |   | x |   |   |
    Offset = 1:
        |   | x |   |   | x |   |   | x |   |    
    Offset = 2:
        |   |   | x |   |   | x |   |   | x |    

    where x represent a unit duration break period

    """
    variables = Variables()
    break_offset = 0
    num_of_tasks = 1
    max_time = num_of_tasks * 3
    processing_time = 2

    if break_offset == 0:
        help_text = "| x |   |   | x |   |   | x |   |   |"
    elif break_offset == 1:
        help_text = "|   | x |   |   | x |   |   | x |   |"
    elif break_offset == 2:
        help_text = "|   |   | x |   |   | x |   |   | x |"
    else:
        print("offset wrong")
        exit()

    breaks = [(i * num_of_tasks + break_offset, i * num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
    tasks = {x for x in range(num_of_tasks)}
    starts_no_break = [x * 3 + break_offset + 1 for x in range(-1, num_of_tasks) if x * 3 + break_offset + 1 >= 0]
    starts_break = list(set(range(max_time)).difference(starts_no_break))
    domain_no_break = cp_model.Domain.from_values([x for x in starts_no_break])
    domain_break = cp_model.Domain.from_values([x for x in starts_break])

    model = cp_model.CpModel()

    variables.var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    variables.var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    variables.var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
    variables.var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}

    for task in tasks:
        if task == 0:
            continue
        model.add(variables.var_task_ends[task - 1] <= variables.var_task_starts[task])

    for task in tasks:
        model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_break).only_enforce_if(
            variables.var_task_overlap_break[task]
        )

        model.add_linear_expression_in_domain(variables.var_task_starts[task], domain_no_break).only_enforce_if(
            variables.~var_task_overlap_break[task]
        )
        # model.add(variables.var_task_durations[task] == processing_time + variables.var_task_overlap_break[task] * 1)

        model.add(variables.var_task_durations[task] == processing_time + 1).only_enforce_if(
            variables.var_task_overlap_break[task]
        )

        model.add(variables.var_task_durations[task] == processing_time).only_enforce_if(
            variables.~var_task_overlap_break[task]
        )

    # intervals
    variables.var_intervals = {
        task: model.new_interval_var(
            variables.var_task_starts[task],
            variables.var_task_durations[task],
            variables.var_task_ends[task],
            name=f"interval_{task}"
        )
        for task in tasks
    }


    tracking_method = 1

    if tracking_method == 0:
        # native  cumulative

        variables.resource_needs = {'all': model.new_int_var(0, 10, 'resource_needs')}
        resource_needs = {task: 1 for task in tasks}

        intervals, headcounts = [], []
        # 2.1 Add tasks requirements
        # 2.1.1 Add demand-based intervals
        for task in tasks:
            intervals.append(variables.var_intervals[task])
            headcounts.append(resource_needs[task])
        # 2.2 Create cumulative constraints
        model.add_cumulative(intervals, headcounts, variables.resource_needs['all'])
    elif tracking_method == 1:
        # cumulative_with_start_time

        max_r = 10
        lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
        ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]

        times_min = min(lb)
        times_max = max(ub)
        time_range = range(times_min, times_max + 1)
        variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
                                    for t in time_range
                                    }
        variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                         for task in tasks
                                         for t in time_range
                                         }
        variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                                  for task in tasks
                                                  for t in time_range
                                                  for duration in [2, 3]
                                                  }
        variables.var_task_starts_presence = {
            (task, t): model.new_bool_var(f'start_{task}_{t}')
            for task in tasks
            for t in range(times_min, times_max + 1)
        }
        for task in tasks:
            model.add_exactly_one([variables.var_task_starts_presence[task, t] for t in time_range])
            # Define min & max duration
            min_duration = variables.var_task_durations[task].proto().domain[0]
            max_duration = variables.var_task_durations[task].proto().domain[1]
            for t in time_range:
                # Capture start time
                model.add(variables.var_task_starts[task] == t).only_enforce_if(
                    variables.var_task_starts_presence[task, t])
                # Capture based on duration
                for duration in [min_duration, max_duration]:
                    model.add_max_equality(variables.duration_task_resource_needs[task, t, duration],
                                         [variables.var_task_starts_presence[task, t_start]
                                          for t_start in range(t - duration + 1, t + 1)
                                          if (task, t_start) in variables.var_task_starts_presence
                                          ]
                                         )
                # Capture the actual task duration and the needs
                model.add(variables.task_resource_needs[task, t]
                          == variables.duration_task_resource_needs[task, t, 2]
                          ).only_enforce_if(variables.~var_task_overlap_break[task])
                model.add(variables.task_resource_needs[task, t]
                          == variables.duration_task_resource_needs[task, t, 3]
                          ).only_enforce_if(variables.var_task_overlap_break[task])

        # Capture total resource needs
        for t in range(times_min, times_max + 1):
            model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
                                                          for task in tasks
                                                          ]
                                                         ))
    elif tracking_method == 2:
        """
        Cumulative with overlap
        """

        max_r = 10
        lb = [variables.var_task_starts[i].proto().domain[0] for i in tasks]
        ub = [variables.var_task_starts[i].proto().domain[1] for i in tasks]

        times_min = min(lb)
        times_max = max(ub)
        resource_needs = {task: 1 for task in tasks}
        variables.resource_needs = {t: model.new_int_var(0, max_r, f'resource_{t}')
                                    for t in range(times_min, times_max + 1)
                                    }
        variables.task_resource_needs = {(task, t): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                         for task in tasks
                                         for t in range(times_min, times_max + 1)
                                         }
        variables.duration_task_resource_needs = {(task, t, duration): model.new_int_var(0, max_r, f'resource_{task}_{t}')
                                                  for task in tasks
                                                  for t in range(times_min, times_max + 1)
                                                  for duration in [2, 3]
                                                  }
        variables.var_task_starts_presence = {
            (task, t): model.new_bool_var(f'start_{task}_{t}')
            for task in tasks
            for t in range(times_min, times_max + 1)
        }
        for task in tasks:
            for t in range(times_min, times_max + 1):
                # s[i] < t
                b1 = model.new_bool_var("")
                model.add(variables.var_task_starts[task] <= t).only_enforce_if(b1)
                model.add(variables.var_task_starts[task] > t).only_enforce_if(~b1)

                # t < e[i]
                b2 = model.new_bool_var("")
                model.add(t < variables.var_task_ends[task]).only_enforce_if(b2)
                model.add(t >= variables.var_task_ends[task]).only_enforce_if(~b2)

                # b1 and b2 (b1 * b2)
                b3 = model.new_bool_var("")
                model.add_bool_and([b1, b2]).only_enforce_if(b3)
                model.add_bool_or([~b1, ~b2]).only_enforce_if(~b3)

                # b1 * b2 * r[i]
                model.add_multiplication_equality(variables.task_resource_needs[task, t], [b3, resource_needs[task]])

        # Capture total resource needs
        for t in range(times_min, times_max + 1):
            model.add(variables.resource_needs[t] == sum([variables.task_resource_needs[task, t] * 1
                                                          for task in tasks
                                                          ]
                                                         ))


    # Objectives
    variables.make_span = {0: model.new_int_var(0, max_time, "make_span")}
    model.add_max_equality(
        variables.make_span[0],
        [variables.var_task_ends[task] for task in tasks]
    )
    variables.total_duration = {0: model.new_int_var(0, max_time, "make_span")}
    model.add(variables.total_duration[0] == sum(variables.var_task_durations.values()))

    model.minimize(variables.make_span[0] + variables.total_duration[0])

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start
    print("total time:", total_time)

    print_result = True
    solution: Variables = extract_solution(solver, variables)
    # show the result if getting the optimal one
    if print_result:
        print("-----------------------------------------")
        print(help_text)
        print("breaks periods:", breaks)
        print("break starts:", starts_break)
        print("no break starts:", starts_no_break)

        if status == cp_model.OPTIMAL:
            big_list = []
            for task in tasks:
                tmp = [
                    f"task {task}",
                    solution.var_task_starts[task],
                    solution.var_task_ends[task],
                    solution.var_task_overlap_break[task],
                    solution.var_task_durations[task],
                ]
                big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
            df = df.sort_values(['start'])
            print(df)
            if tracking_method == 0:
                print('Using method: cumulative_with_built_in_feature')
                print('resource_needs', solution.resource_needs)

            elif tracking_method == 1:
                print('Using method: cumulative_with_start_time')
                print('resource_needs', solution.resource_needs)
                print('task_resource_needs', solution.task_resource_needs)
                print('duration_task_resource_needs', solution.duration_task_resource_needs)
                print('var_task_starts_presence', solution.var_task_starts_presence)

            elif tracking_method == 2:
                print('Using method: cumulative_with_overlap')
                print('resource_needs', solution.resource_needs)
                print('task_resource_needs', solution.task_resource_needs)
                print('duration_task_resource_needs', solution.duration_task_resource_needs)
                print('var_task_starts_presence', solution.var_task_starts_presence)

            print('Make-span:', solution.make_span[0])
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

Max number of continuous tasks

Source: scheduling/example_09_max_number_of_continuous_tasks.py

First campaigning chapter. In many process industries, tasks of the same product can run back-to-back with no changeover, but only up to N in a row - the equipment needs a cleaning cycle after that, even between two batches of the same product.

This model takes the most direct approach: campaigns are first-class entities. For every product, enumerate as many candidate campaigns as there are tasks of that product. Each campaign has start, end, duration, presence; a task is assigned to exactly one campaign via var_task_campaign_presences[t, c]; a campaign is present iff at least one task is assigned (add_max_equality); campaign duration bounded by max_conti_task_num caps the size.

Campaigns (not tasks) are then sequenced with add_circuit, with a changeover distance between consecutive campaigns. Expected pattern for A A A B is A A -> CO -> A -> CO -> B.

This is the "business view" of campaigning. A more compact alternative follows in Campaigning with cumul.

Concepts

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task   product
1       A
2       A
3       A
4       B
'''

# 1. Data

tasks = {1, 2, 3, 4}
products = {'A', 'B'}
task_to_product = {1: 'A', 2: 'A', 3: 'A', 4: 'B'}
processing_time = {'A': 1, 'B': 1}
changeover_time = 2
max_conti_task_num = 2
max_time = 100

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] else changeover_time
    for t1 in tasks for t2 in tasks
    if t1 != t2
}



# Campaign related data pre=processing

max_product_campaigns = {
    product: len([task for task in tasks if task_to_product[task]==product]) for product in products
}

product_campaigns = {
    (product, f"{product}_{campaign}")
    for product in max_product_campaigns
    for campaign in list(range(0, max_product_campaigns[product]))
    #if max_product_campaigns[product] > 0
}

campaign_to_product = {
    campaign: product for product, campaign in product_campaigns
}

campaigns = {campaign for product, campaign in product_campaigns}

product_to_campaigns = {
    product: [c for c in campaigns if campaign_to_product[c] == product] for product in products
}

task_to_campaigns = {
    task: [
        campaign for campaign in campaigns if campaign_to_product[campaign] == task_to_product[task]
    ]
    for task in tasks
}

campaign_size = {campaign: max_conti_task_num for campaign in campaigns}

campaign_duration = {campaign: max_conti_task_num for campaign in campaigns}

campaign_to_tasks = {
    campaign:
        [
            task for task in tasks if campaign in task_to_campaigns[task]
        ]
    for campaign in campaigns
}

m_cost_campaign = {
    (c1, c2): 0
    if campaign_to_product[c1] == campaign_to_product[c2] else changeover_time
    for c1 in campaigns for c2 in campaigns
    if c1 != c2
}




# Need changeover if we switch from a product to a different product
# We can continue to do tasks of the same product type but there is
# a max number of continuous production of tasks with the same product type
# For A A A, we expect A -> A -> Changeover -> A
# For A A A A A, we expect A -> A -> Changeover -> A A -> Changeover -> A
# For A B, we expect A -> Changeover -> B
# For A A B, we expect A -> A -> Changeover -> B


# 2. Decision variables

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

# variables_task_sequence = {
#     (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }
#
# variables_co_starts = {
#     (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_start")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }
#
# variables_co_ends = {
#     (t1, t2): model.new_int_var(0, max_time, f"co_t{t1}_to_t{t2}_end")
#     for t1 in tasks
#     for t2 in tasks
#     if t1 != t2
# }

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [variables_task_ends[task] for task in tasks]
)

model.minimize(make_span)


# 4. Constraints

# Task Duration
# for task in tasks:
#     model.add(
#         variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
#     )

var_task_intervals = {
    t: model.new_interval_var(
        variables_task_starts[t],
        1,
        variables_task_ends[t],
        f"task_{t}_interval"
    )
    for t in tasks
}



# Sequence
# arcs = list()
# for t1 in tasks:
#     for t2 in tasks:
#         # arcs
#         if t1 != t2:
#             arcs.append([
#                 t1,
#                 t2,
#                 variables_task_sequence[(t1, t2)]
#             ])
#             distance = m_cost[t1, t2]
#             # cannot require the time index of task 0 to represent the first and the last position
#             if t2 != 0:
#                 # to schedule tasks and c/o
#                 model.add(
#                     variables_task_ends[t1] <= variables_co_starts[t1, t2]
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
#                 model.add(
#                     variables_co_ends[t1, t2] <= variables_task_starts[t2]
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
#                 model.add(
#                     variables_co_ends[t1, t2] - variables_co_starts[t1, t2] == distance
#                 ).only_enforce_if(variables_task_sequence[(t1, t2)])
#
# model.add_circuit(arcs)


# Campaigning related

var_campaign_starts = {
    c: model.new_int_var(0, max_time, f"start_{c}") for c in campaigns
}

var_campaign_ends = {
    c: model.new_int_var(0, max_time, f"c_end_{c}") for c in campaigns
}

var_campaign_durations = {
    c: model.new_int_var(0, max_time, f"c_duration_{c}") for c in campaigns
}

var_campaign_presences = {
    c: model.new_bool_var(f"c_presence_{c}") for c in campaigns
}

# Task Duration
# for c in campaigns:
#     model.add(
#         var_campaign_ends[c] - var_campaign_starts[c] == var_campaign_durations[c]
#     )


# var_campaign_intervals = {
#     c: model.new_optional_interval_var(
#         var_campaign_starts[c],  # campaign start
#         var_campaign_durations[c],  # campaign duration
#         var_campaign_ends[c],  # campaign end
#         var_campaign_presences[c],  # campaign presence
#         f"c_interval_{c}",
#     )
#     for c in campaigns
# }

# If a task in allocated to a campaign
var_task_campaign_presences = {
    (t, c): model.new_bool_var(f"task_{t}_presence_in_campaign_{c}") for t in tasks for c in task_to_campaigns[t]
}

# add_campaign_definition_constraints
# 1. Campaign definition: Start & duration based on tasks that belongs to the campaign

for c in campaigns:
    #  1. Duration definition
    model.add(
        var_campaign_durations[c] == sum(
            processing_time[task_to_product[t]] * var_task_campaign_presences[t, c]
            for t in campaign_to_tasks[c]
        )
    )
    # 2. Start-end definition
    # TODO: MinEquality ?
    for t in campaign_to_tasks[c]:
        # var_campaign_starts
        # var_campaign_ends

        model.add(var_campaign_starts[c] <= variables_task_starts[t]).only_enforce_if(
            var_task_campaign_presences[t, c]
        )
        model.add(variables_task_ends[t] <= var_campaign_ends[c]).only_enforce_if(
            var_task_campaign_presences[t, c]
        )
    # 3. Link c & tc presence: If 1 task is scheduled on a campaign -> presence[t, c] = 1 ==> presence[c] == 1
    # as long as there is one task in a campaign, this campaign must be present
    model.add_max_equality(
        var_campaign_presences[c], [var_task_campaign_presences[t, c] for t in campaign_to_tasks[c]]
    )

# 2. Definition of the bool var: if a task belongs to a campaign
for task in tasks:
    # One task belongs to at most 1 campaign
    # task_to_campaigns[task]
    model.add(
        sum(var_task_campaign_presences[task, campaign]
            for campaign in campaigns
            if campaign in task_to_campaigns[task]
            ) == 1
    )
# 3. No campaign overlap
# Campaigns won't overlap anyway. But we need this for tasks within campaigns
model.add_no_overlap([x for x in var_task_intervals.values()])

for c in campaigns:
    model.add(
        var_campaign_durations[c] <= 2
    )


# Add campaign circuit

arcs = []

var_campaign_sequence = {}

for node_1, campaign_1 in enumerate(campaigns):

    tmp1 = model.new_bool_var(f'first_to_{campaign_1}')
    arcs.append([
        0, node_1 + 1, tmp1
    ])

    tmp2 = model.new_bool_var(f'{campaign_1}_to last')
    arcs.append([
        node_1 + 1, 0, tmp2
    ])

    arcs.append([node_1 + 1, node_1 + 1, ~var_campaign_presences[campaign_1]])

    # for outputting
    var_campaign_sequence.update({(0, campaign_1): tmp1})
    var_campaign_sequence.update({(campaign_1, 0): tmp2})

    for node_2, campaign_2 in enumerate(campaigns):

        if node_1 == node_2:
            continue

        indicator_node_1_to_node_2 = model.new_bool_var(f'{campaign_1}_to_{campaign_2}')

        var_campaign_sequence.update({(campaign_1, campaign_2): indicator_node_1_to_node_2})

        distance = m_cost_campaign[campaign_1, campaign_2]

        arcs.append([node_1 + 1, node_2 + 1, indicator_node_1_to_node_2])

        model.add(
            var_campaign_ends[campaign_1] + distance <= var_campaign_starts[campaign_2]
        ).only_enforce_if(
            indicator_node_1_to_node_2
        ).only_enforce_if(
            var_campaign_presences[campaign_1]
        ).only_enforce_if(
            var_campaign_presences[campaign_2]
        )

model.add_circuit(arcs)


# Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )
    print('-------------------------------------------------')
    print('Make-span:', solver.value(make_span))
    # for t1 in tasks:
    #     for t2 in tasks:
    #         if t1 != t2:
    #             value = solver.value(var[t1, t2])
    #             if value == 1 and t2 != 0:
    #                 print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}  cost: {m_cost[t1, t2]}')
    #                 print('variables_task_sequence[t1, t2]',
    #                       solver.value(variables_task_sequence[t1, t2]))
    #                 print('variables_co_starts[t1, t2]', solver.value(variables_co_starts[t1, t2]))
    #                 print('variables_co_ends[t1, t2]', solver.value(variables_co_ends[t1, t2]))
    #
    #             if value == 1 and t2 == 0:
    #                 print(f'{t1} --> {t2}   Closing')
    for c1 in list(campaigns) + [0]:
        for c2 in list(campaigns) + [0]:
            if c1 == c2:
                continue
            value = solver.value(var_campaign_sequence[c1, c2])

            if value == 1 and c2 != 0 and c1 != 0:
                c1_tasks = []
                c2_tasks = []
                for task in campaign_to_tasks[c1]:
                    if solver.value(var_task_campaign_presences[task, c1]) == 1:
                        c1_tasks.append(task)

                for task in campaign_to_tasks[c2]:
                    if solver.value(var_task_campaign_presences[task, c2]) == 1:
                        c2_tasks.append(task)

                print(f'{c1} --> {c2}   {campaign_to_product[c1]} {c1_tasks}  >>  {campaign_to_product[c2]} {c2_tasks} cost: {m_cost_campaign[c1, c2]}')

            if value == 1 and c1 == 0:
                print(f'{c1} --> {c2}   Starting')

            if value == 1 and c2 == 0:
                print(f'{c1} --> {c2}   Closing')


elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)

Campaigning time constraint (empty)

Source: scheduling/example_18_campaigning_time_constraint.py

Empty file. Likely intended to cap campaigns by total duration (rather than task count). For the closest working idea, see Campaigning with cumul, which caps by rank; replacing the rank limit with a duration sum is the natural variant.

Cleaning holding time (empty)

Source: scheduling/example_19_cleaning_holding_time.py

Empty file. Likely intended to model "cleaning events with a minimum or maximum holding time" between tasks - a changeover interval that must sit inside a specific window. See Changeover as event; adding an extra min_hold <= co_duration <= max_hold bound gives you the idea.

Campaigning with cumul

Source: scheduling/example_24_campaigning_with_cumul.py

The campaigns-as-entities model works, but it creates a lot of variables. This chapter introduces the compact alternative: keep tasks as the atomic unit and attach a rank variable cumul[t] in [0, campaign_size - 1].

On each t1 -> t2 arc the model branches on reach_max[t1]:

  • Campaign continues (reach_max[t1] false): end[t1] <= start[t2] and cumul[t2] == cumul[t1] + 1.
  • Campaign ends (reach_max[t1] true): a changeover is inserted and cumul[t2] = 0 resets the rank.

The circuit uses -1 as the first/last dummy node. The __main__ block benchmarks solve time at campaign sizes 2 and 3, showing the approach scales better than the entity model.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result = True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # if from task has not reached MAX, continue the campaign
            model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])
            model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])

            # if from task has reached MAX, apply changeover and reset its cumulative indicator
            model.add(var_task_cumul[t2] == 0).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])
            model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    return total_time


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
    plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 8
    sizes = range(2, N+1)
    model_times_campaign_2 = []
    model_times_campaign_3 = []

    for num_task in sizes:
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))

    print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence

Source: scheduling/example_25_campaigning_with_locked_seq.py

Same cumulative-rank campaigning, with a single heuristic bolted on: lock the task order.

for task in tasks:
    if task != 0:
        model.add(var_task_ends[task - 1] <= var_task_starts[task])

In practice, task indices often already reflect priority or deadline order. Telling the solver so upfront prunes a lot of symmetric branches without changing the optimum. The speed-up can be an order of magnitude.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result = True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    for task in tasks:
        if task != 0:
            model.add(var_task_ends[task-1] <= var_task_starts[task])


    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # if from task has not reached MAX, continue the campaign
            model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])
            model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(~var_task_reach_max[t1])

            # if from task has reached MAX, apply changeover and reset its cumulative indicator
            model.add(var_task_cumul[t2] == 0).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])
            model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
                literals[t1, t2]
            ).only_enforce_if(var_task_reach_max[t1])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    return total_time


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1, marker='o', label = 'Campaign size: 2')
    plt.plot(x, y2, marker='o', label = 'Campaign size: 3')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 25
    sizes = range(2, N+1)
    model_times_campaign_2 = []
    model_times_campaign_3 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_3.append(run_model(num_task, campaign_size=3, print_result=False))

    print_unit_test_result(sizes, model_times_campaign_2, model_times_campaign_3,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence improved

Source: scheduling/example_26_campaigning_locked_seq_improved.py

The previous chapter forced a campaign to end only when the rank hit the cap. Sometimes ending earlier gives a better schedule - for example when a near-deadline task of a different product is waiting. This chapter lets the solver decide.

The "force reach_max when cumul hits cap" implication is dropped. The order lock loosens to start[t-1] <= start[t] (from the stricter end <= start), which is compatible with flexible campaign ends.

The chapter also introduces a recurring trick: since add_max_equality cannot be wrapped in only_enforce_if, compute the next rank outside the literal and then assign it conditionally.

model.add_max_equality(
    max_values[t1, t2],
    [0, cumul[t1] + 1 - reach_end[t1] * campaign_size],
)
model.add(cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result=True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_task_reach_max = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    # A relative later task shall not start earlier than a relative earlier task
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] <= var_task_starts[task])

    for task in tasks:
        model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
        model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # # if from task has not reached MAX, continue the campaign
            # model.add(var_task_ends[t1] <= var_task_starts[t2]).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(~var_task_reach_max[t1])
            # model.add(var_task_cumul[t2] == var_task_cumul[t1] + 1).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(~var_task_reach_max[t1])
            #
            # # if from task has reached MAX, apply changeover and reset its cumulative indicator
            # model.add(var_task_cumul[t2] == 0).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(var_task_reach_max[t1])
            # model.add(var_task_ends[t1] + changeover_time <= var_task_starts[t2]).only_enforce_if(
            #     literals[t1, t2]
            # ).only_enforce_if(var_task_reach_max[t1])

            model.add(
                var_task_ends[t1] + var_task_reach_max[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            model.add(
                var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            ).only_enforce_if(
                literals[t1, t2]
            )

            # The following makes the model invalid
            # model.add_max_equality(
            #     var_task_cumul[t2],
            #     [
            #         0,
            #         var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            #     ]
            # ).only_enforce_if(
            #     literals[t1, t2]
            # )

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1,  label='Campaign size: 2')
    plt.plot(x, y2,  label='Campaign size: 5')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    N = 60
    sizes = range(2, N+1, 4)
    model_times_campaign_2 = []
    model_times_campaign_5 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))

    df = pd.DataFrame({
        'num_tasks': sizes,
        'c2': model_times_campaign_2,
        'c5': model_times_campaign_5
    })
    df.to_csv("example_26_result.csv")

    print_unit_test_result(sizes,
                           model_times_campaign_2,
                           model_times_campaign_5,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning, locked sequence improved (flexible)

Source: scheduling/example_26_campaigning_locked_seq_improved_flexible.py

Twin of the previous chapter. The two files exist side by side to document an important design choice explicitly: don't force the campaign to end just because the rank hit the cap. Let the solver decide.

Model structure is identical to its sibling; the comments here are the main difference. Read both to see the "before / after" of the refinement.

Concepts

Source

# Inspired by https://stackoverflow.com/questions/75554536

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd

model = cp_model.CpModel()


def generate_one_product_data(num_tasks):
    """ Generate N tasks of product A """
    tasks = {i for i in range(num_tasks)}
    task_to_product = ({i: 'A' for i in range(int(num_tasks))})
    return tasks, task_to_product


def run_model(num_tasks, campaign_size, print_result=True):

    # if campaign size is 2, then we need cumul indicator to be 0, 1

    changeover_time = 2
    max_time = num_tasks*2
    processing_time = 1

    tasks, task_to_product = generate_one_product_data(num_tasks)
    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    model.add(var_task_cumul[0]==0)
    var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}

    # Lock the sequence of the tasks (assume the deadlines are in this sequence !)
    # A relative later task shall not start earlier than a relative earlier task
    for task in tasks:
        if task != 0:
            model.add(var_task_starts[task-1] <= var_task_starts[task])

    # ! SHALL REMOVE THE FOLLOWING ! LEAVE THE DECISION (OF STARTING A NEW CAMPAIGN OR NOT) BACK TO THE MODEL !
    # for task in tasks:
    #     model.add(var_task_cumul[task] == campaign_size-1).only_enforce_if(var_task_reach_max[task])
    #     model.add(var_task_cumul[task] < campaign_size-1).only_enforce_if(~var_task_reach_max[task])

    var_task_intervals = {
        t: model.new_interval_var(
            var_task_starts[t],
            processing_time,
            var_task_ends[t],
            f"task_{t}_interval"
        )
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # [ task1 ] -> [ C/O ] -> [ task 2]
            model.add(
                var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            # This is for fixed campaigning size. DO full-campaign or NOT DO.
            # model.add(
            #    var_task_cumul[t2] == var_task_cumul[t1] + 1 - var_task_reach_max[t1]*campaign_size
            # ).only_enforce_if(literals[t1, t2])

            # The creator has confirmed add_max_equality is not compatible with only_enforce_if
            # model.add_max_equality(
            #     var_task_cumul[t2],
            #     [0,var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            # ).only_enforce_if(literals[t1, t2])

            # ! HERE IS THE CHANGE RECOMMENDED FOR FLEXIBLE CAMPAIGNING. BUT IN TWO STEPS !
            # NOTE var_reach_campaign_end ARE NOW OPEN BOOL DECISION VARIABLES.
            #
            # If reaching limit (var_task_cumul +1 is equal to campaign_size), the expression is max (0,0)
            #   the model must do C/O because var_task_cumul[t2] has to be 0 (a new campaign starts).
            #
            # If not yet reaching limit (var_task_cumul +1 < campaign_size), the expression is max(0, -x)
            #   model can still choose to do C/O by having var_reach_campaign_end to be 1 (if that brings a better obj)
            model.add_max_equality(
                max_values[t1, t2],
                [0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            )
            model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                print(f'Task {task} ',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      solver.value(var_reach_campaign_end[task])
                      )
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


def print_unit_test_result(x, y1, y2, title=''):
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    plt.plot(x, y1,  label='Campaign size: 2')
    plt.plot(x, y2,  label='Campaign size: 5')
    plt.legend()
    plt.title(title)
    plt.xlabel('The number of tasks')
    plt.ylabel('Seconds')
    plt.show()


if __name__ == '__main__':

    # print('Point check for run_model(40, 10)\nExpect make-span = 40*1 + (40/10 - 1)*2 = 40 + 6 = 46')
    # run_model(40, 10)

    N = 60
    sizes = range(2, N+1, 4)
    model_times_campaign_2 = []
    model_times_campaign_5 = []

    for num_task in sizes:
        print(num_task)
        model_times_campaign_2.append(run_model(num_task, campaign_size=2, print_result=False))
        model_times_campaign_5.append(run_model(num_task, campaign_size=5, print_result=False))

    df = pd.DataFrame({
        'num_tasks': sizes,
        'c2': model_times_campaign_2,
        'c5': model_times_campaign_5
    })
    print(df)

    print_unit_test_result(sizes,
                           model_times_campaign_2,
                           model_times_campaign_5,
                           'Scalability of Campaigning with Cumulative Indicator')

Campaigning across products

Source: scheduling/example_27_campaigning_products.py

So far every campaigning example has used one product. Real plants run many. A campaign must now end both when it hits the size cap and whenever the product changes.

Two extra booleans per task plug the product-change rule into the cumulative-rank model:

model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(literals[t1, t2])
model.add(var_reach_campaign_end[t1] >= var_product_change[t1])

The first line captures whether the outgoing arc crosses a product boundary; the second escalates any such crossing into a campaign end, which in turn forces a changeover via the familiar rank-reset machinery.

An optional per-product order lock (cumul[t-1] <= cumul[t]) speeds up larger instances. The __main__ block checks the solver's makespan against a closed-form expected value - a handy sanity test.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
import pandas as pd
import string
from math import ceil

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate tasks of products (no more than 26 products) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, print_result=True):
    """
    Do changeovers if either of the following occurs:
    1. Changeover between different products: [A] -> changeover -> [B]
    2. Previous campaign reaching  size limit: [A ... A]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 3
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*2
    processing_time = 1

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print(f'\nInput data: {task_to_product}\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_task_cumul = {task: model.new_int_var(0, campaign_size-1, f"task_{task}_cumul") for task in tasks}
    for product_idx, product in enumerate(range(number_of_products)):
        model.add(var_task_cumul[product_idx*num_of_tasks_per_product] == 0)

    var_reach_campaign_end = {task: model.new_bool_var(f"task_{task}_reach_max") for task in tasks}
    var_product_change = {task: model.new_bool_var(f"task_{task}_go_to_different_product") for task in tasks}

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)

    print("Apply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLocking {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_cumul[_index-1] <= var_task_cumul[_index])

    # Option 2: Locking the sequence of all tasks! This is quicker (0.10s for 3, 4, 4)
    # print("Locking 0", end=" ")
    # for task in tasks:
    #     if task != 0:
    #         print(f"<= {task}", end=" ")
    #         model.add(var_task_starts[task-1] <= var_task_starts[task])

    print("\n")

    var_task_intervals = {
        t: model.new_interval_var(var_task_starts[t], processing_time, var_task_ends[t], f"task_{t}_interval")
        for t in tasks
    }
    model.add_no_overlap(var_task_intervals.values())

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(t1, t2): model.new_bool_var(f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}") for t1 in tasks for t2 in tasks if t1 != t2}

    arcs = []
    for t1 in tasks:
        arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
        arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
        for t2 in tasks:
            if t1 == t2:
                continue
            arcs.append([t1, t2, literals[t1, t2]])

            # [ task1 ] -> [ C/O ] -> [ task 2]
            model.add(var_product_change[t1] == product_change_indicator[t1, t2]).only_enforce_if(
                literals[t1, t2]
            )

            model.add(var_reach_campaign_end[t1] >= var_product_change[t1])

            model.add(
                var_task_ends[t1] + var_reach_campaign_end[t1]*changeover_time <= var_task_starts[t2]
            ).only_enforce_if(
                literals[t1, t2]
            )

            # allow flexible campaigning
            model.add_max_equality(
                max_values[t1, t2],
                [0, var_task_cumul[t1] + 1 - var_reach_campaign_end[t1]*campaign_size]
            )
            model.add(var_task_cumul[t2] == max_values[t1, t2]).only_enforce_if(literals[t1, t2])

    model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    if print_result:
        if status == cp_model.OPTIMAL:
            for task in tasks:
                if task%num_of_tasks_per_product == 0:
                    print('--------- product divider ---------\n')
                print(f'Task {task} {task_to_product[task]}',
                      solver.value(var_task_starts[task]),
                      solver.value(var_task_ends[task]),
                      solver.value(var_task_cumul[task]),
                      solver.value(var_reach_campaign_end[task]),
                      solver.value(var_product_change[task]),
                      )
                if solver.value(var_reach_campaign_end[task]):
                    print('-- campaign ends --\n')
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    _number_of_products = 3
    _num_of_tasks_per_product = 4
    _campaign_size = 4
    n = ceil(_num_of_tasks_per_product/_campaign_size)
    _make_span = (_num_of_tasks_per_product + 2*n)*_number_of_products-2
    run_time = run_model(_number_of_products, _num_of_tasks_per_product, _campaign_size)
    print(f"Runtime: {round(run_time, 2)}s")

    print(f'\nExpected make-span if all right:'
          f'\n = (_num_of_tasks_per_product + changeover_time*ceil(_num_of_tasks_per_product/_campaign_size))*'
          f'_number_of_products - changeover_time '
          f'\n = {_make_span}')

Campaigning products x machines

Source: scheduling/example_28_campaigning_products_machines.py

The last two layers of complexity combine here: multi-product campaigning runs on multiple machines in parallel. Rank, reach_campaign_end, and product_change booleans are all indexed by (machine, task). Per-machine add_circuit with presence self-loops, per-machine campaigning rules inside each arc.

This is the most structurally involved CP-SAT model in the book and a good reference for assembling the earlier pieces - presence booleans, per-machine circuits, cumulative-rank campaigning, product-change reification - into one coherent model.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
    """
    Allocate to tasks to multiple machines
    And do changeover if either of the following occurs:
    1. Switch between different products: [A campaign] -> changeover -> [B campaign]
    2. Previous campaign reaching the size limit: [A FULL campaign]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 2
    # number_of_machines = 2
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*3
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product, '\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    model.add_decision_strategy(
        var_m_t_product_change.values(),
        cp_model.CHOOSE_FIRST,
        cp_model.SELECT_MIN_VALUE
    )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    print("\nApply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLock {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue

                # if t1 > t2:
                #     model.add(literals[m, t1, t2] == 0)

                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # show the result if getting the optimal one
    if print_result:
        if status == cp_model.OPTIMAL:
            big_list = []
            for m in machines:
                for task in tasks:
                    if solver.value(var_machine_task_presences[m, task]):
                        tmp = [
                            f"machine {m}",
                            f"task {task}",
                            task_to_product[task],
                            solver.value(var_task_starts[task]),
                            solver.value(var_task_ends[task]),
                            solver.value(var_machine_task_rank[m, task]),
                            solver.value(var_m_t_product_change[m, task]),
                            solver.value(var_m_t_reach_campaign_end[m, task])
                        ]
                        big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
            df = df.sort_values(['machine', 'start'])
            for m in machines:
                print(f"\n======= Machine {m} =======")
                df_tmp = df[df['machine']==f"machine {m}"]
                print(df_tmp)
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    # number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
    args = 4, 4, 3, 3

    runtime = run_model(*args)
    print(f"Solving time: {round(runtime, 2)}s")

Campaigning with pregrouping (empty)

Source: scheduling/example_30_campaigning_with_pregrouping.py

Empty file. Likely intended to pre-group tasks into candidate campaigns outside the solver, then let CP-SAT only sequence the pre-built groups. The closest working realisation is Max number of continuous tasks; fixing which tasks go into which campaign before solving gives you the pregrouping variant.

Campaigning faster

Source: scheduling/example_31_campaigning_faster.py

Tuned companion to the multi-product, multi-machine chapter. Same problem, same model skeleton; what changes is which heuristics are on, how the constraints are ordered, and a few implementation shortcuts.

Treat this as the "after tuning" version: when the earlier model starts to struggle on larger instances, walk through the differences here to see which levers were pulled.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import string

model = cp_model.CpModel()


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def run_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):
    """
    Allocate to tasks to multiple machines
    And do changeover if either of the following occurs:
    1. Switch between different products: [A campaign] -> changeover -> [B campaign]
    2. Previous campaign reaching the size limit: [A FULL campaign]  -> changeover -> next any campaign
    """

    # number_of_products = 2
    # num_of_tasks_per_product = 4
    # campaign_size = 2
    # number_of_machines = 2
    # print_result = True

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*3
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product)

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    model.add_decision_strategy(
        var_m_t_product_change.values(),
        cp_model.CHOOSE_FIRST,
        cp_model.SELECT_MIN_VALUE
    )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    print("Apply the tasks sequence heuristics")
    # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    for product_idx, product in enumerate(range(number_of_products)):
        for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
            _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
            if task_id_in_product_group == 0:
                print(f"\nLock {_index}", end=" ")
            else:
                print(f" <= {_index}", end=" ")
                model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue

                # this accelerates code much
                # if t1 > t2 and task_to_product[t1]==task_to_product[t2]:
                #     model.add(literals[m, t1, t2] == 0)
                if t1 > t2:
                    model.add(literals[m, t1, t2] == 0)

                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    solver = cp_model.CpSolver()
    start = time()
    status = solver.solve(model=model)
    total_time = time() - start

    # show the result if getting the optimal one
    if print_result:
        if status == cp_model.OPTIMAL:
            big_list = []
            for m in machines:
                for task in tasks:
                    if solver.value(var_machine_task_presences[m, task]):
                        tmp = [
                            f"machine {m}",
                            f"task {task}",
                            task_to_product[task],
                            solver.value(var_task_starts[task]),
                            solver.value(var_task_ends[task]),
                            solver.value(var_machine_task_rank[m, task]),
                            solver.value(var_m_t_product_change[m, task]),
                            solver.value(var_m_t_reach_campaign_end[m, task])
                        ]
                        big_list.append(tmp)
            df = pd.DataFrame(big_list)
            df.columns = ['machine', 'task', 'prd', 'start', 'end', 'rank', 'prd_chg', 'c/o']
            df = df.sort_values(['machine', 'start'])
            for m in machines:
                print(f"\n======= Machine {m} =======")
                df_tmp = df[df['machine']==f"machine {m}"]
                print(df_tmp)
            print('-------------------------------------------------')
            print('Make-span:', solver.value(make_span))
        elif status == cp_model.INFEASIBLE:
            print("Infeasible")
        elif status == cp_model.MODEL_INVALID:
            print("Model invalid")
        else:
            print(status)

    if status == cp_model.OPTIMAL:
        return total_time
    else:
        return -999


if __name__ == '__main__':

    # number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines
    args = 4, 4, 3, 3

    runtime = run_model(*args)
    print(f"Solving time: {round(runtime, 2)}s")

Solving by phases

Source: scheduling/example_32_solving_by_phases.py

Large campaigning models can be slow to find even their first feasible solution. A pragmatic trick: build the full model once, solve it repeatedly with a growing max_time, and carry each phase's solution forward as hints to the next.

The chapter ships a small pair of helpers around model.proto():

def get_solutions(model, solver):
    return {v.name: solver.response_proto().solution[i]
            for i, v in enumerate(model.proto().variables)}

def add_hints(model, solution):
    for i, v in enumerate(model.proto().variables):
        if v.name in solution:
            model.proto().solution_hint.vars.append(i)
            model.proto().solution_hint.values.append(solution[v.name])

With model.clear_hints() between runs, the solver gets a clean warm start each phase. For hard instances this is often the difference between "no feasible answer in 10 minutes" and a smooth ramp from trivial horizon to the full one.

Concepts

Source

from ortools.sat.python import cp_model
from time import time
from matplotlib import pyplot as plt
from matplotlib.ticker import MaxNLocator
from ortools.sat import cp_model_pb2
import pandas as pd
import numpy as np
import string


def generate_task_data(num_of_products, num_of_tasks_per_product):
    """ Generate the same number of tasks for multiple products (no more than 26 products please) """
    products = string.ascii_uppercase[0:num_of_products]
    total_num_of_tasks = num_of_tasks_per_product*num_of_products
    tasks = {x for x in range(total_num_of_tasks)}
    task_to_product = {}
    for product_idx, product in enumerate(products):
        task_to_product.update({
            product_idx*num_of_tasks_per_product+task_idx: product for task_idx in range(num_of_tasks_per_product)
        })
    return tasks, task_to_product


def create_model(number_of_products, num_of_tasks_per_product, campaign_size, number_of_machines, print_result=True):

    model = cp_model.CpModel()

    changeover_time = 2
    max_time = num_of_tasks_per_product*number_of_products*5
    processing_time = 1
    machines = {x for x in range(number_of_machines)}

    tasks, task_to_product = generate_task_data(number_of_products, num_of_tasks_per_product)
    print('Input data:\nTasks:', tasks, task_to_product, '\n')

    product_change_indicator = {
        (t1, t2): 0 if task_to_product[t1] == task_to_product[t2] else 1 for t1 in tasks for t2 in tasks if t1 != t2
    }

    var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
    var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}

    var_machine_task_starts = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_start") for t in tasks for m in machines}
    var_machine_task_ends = {(m, t): model.new_int_var(0, max_time, f"m{m}_t{t}_end") for t in tasks for m in machines}
    var_machine_task_presences = {(m, t): model.new_bool_var(f"pre_{m}_{t}") for t in tasks for m in machines}

    var_machine_task_rank = {(m, t): model.new_int_var(0, campaign_size-1, f"t_{t}_cu") for t in tasks for m in machines}

    # No influence on the final result. Not need to lock the starting rank values of the first tasks per product to be 0
    # for product_idx, product in enumerate(range(number_of_products)):
    #     print(f"Lock the rank of task {product_idx*num_of_tasks_per_product} to zero on all machines")
    #     for m in machines:
    #         model.add(var_machine_task_rank[m, product_idx*num_of_tasks_per_product] == 0)

    var_m_t_reach_campaign_end = {(m, t): model.new_bool_var(f"t{t}_reach_max_on_m{m}") for t in tasks for m in machines}
    var_m_t_product_change = {(m, t): model.new_bool_var(f"task_{t}_change_product_on_m{m}") for t in tasks for m in machines}

    # This is optional
    # model.add_decision_strategy(
    #     var_m_t_product_change.values(),
    #     cp_model.CHOOSE_FIRST,
    #     cp_model.SELECT_MIN_VALUE
    # )

    # Heuristic: Lock the sequence of the tasks (assume the deadlines are in the task order
    # AND a task with later deadline shall not start earlier than a task with a earlier deadline)
    # print("\nApply the tasks sequence heuristics")
    # # Option 1: Locking the sequence of tasks per product! This is slower (7.54s for 3, 4, 4)
    # for product_idx, product in enumerate(range(number_of_products)):
    #     for task_id_in_product_group, task in enumerate(range(num_of_tasks_per_product)):
    #         _index = product_idx * num_of_tasks_per_product + task_id_in_product_group
    #         if task_id_in_product_group == 0:
    #             print(f"\nLock {_index}", end=" ")
    #         else:
    #             print(f" <= {_index}", end=" ")
    #             model.add(var_task_ends[_index-1] <= var_task_starts[_index])
    # print("\n")

    # These intervals is needed otherwise the duration is not constrained
    var_machine_task_intervals = {
        (m, t): model.new_optional_interval_var(
            var_machine_task_starts[m, t],
            processing_time,
            var_machine_task_ends[m, t],
            var_machine_task_presences[m, t],
            f"task_{t}_interval_on_m_{m}")
        for t in tasks for m in machines
    }

    # each task is only present in one machine
    for task in tasks:
        task_candidate_machines = machines
        tmp = [
            var_machine_task_presences[m, task]
            for m in task_candidate_machines
        ]
        model.add_exactly_one(tmp)

    # link task-level to machine-task level for start time & end time
    for task in tasks:
        task_candidate_machines = machines
        for m in task_candidate_machines:
            model.add(
                var_task_starts[task] == var_machine_task_starts[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

            model.add(
                var_task_ends[task] == var_machine_task_ends[m, task]
            ).only_enforce_if(var_machine_task_presences[m, task])

    # Set objective to minimize make-span
    make_span = model.new_int_var(0, max_time, "make_span")
    total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")
    model.add_max_equality(make_span, [var_task_ends[task] for task in tasks])
    model.add(total_changeover_time == sum(var_m_t_reach_campaign_end[m,t] for m in machines for t in tasks))
    model.minimize(make_span)

    # the bool variables to indicator if t1 -> t2
    literals = {(m, t1, t2): model.new_bool_var(f"{t1} -> {t2}")
                for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # the technical variables to allow flexible campaigning
    max_values = {(m, t1, t2): model.new_int_var(0, max_time, f"{t1} -> {t2}")
                  for m in machines for t1 in tasks for t2 in tasks if t1 != t2}

    # schedule the tasks
    for m in machines:
        arcs = []
        for t1 in tasks:
            arcs.append([-1, t1, model.new_bool_var(f"first_to_{t1}")])
            arcs.append([t1, -1, model.new_bool_var(f"{t1}_to_last")])
            arcs.append([t1, t1, ~var_machine_task_presences[(m, t1)]])

            for t2 in tasks:
                if t1 == t2:
                    continue
                arcs.append([t1, t2, literals[m, t1, t2]])

                # If A -> B then var_m_t_product_change>=1  (can be 0 if the last task in a machine)
                model.add(var_m_t_product_change[m, t1] >= product_change_indicator[t1, t2]).only_enforce_if(
                    literals[m, t1, t2]
                )

                # If var_m_t_product_change=1 then the campaign must end
                model.add(var_m_t_reach_campaign_end[m, t1] >= var_m_t_product_change[m, t1])

                # if the campaign ends then there must be changeover time
                # [ task1 ] -> [ C/O ] -> [ task 2]
                model.add(
                    var_task_ends[t1] + var_m_t_reach_campaign_end[m, t1]*changeover_time <= var_task_starts[t2]
                ).only_enforce_if(
                    literals[m, t1, t2]
                )

                # model could also decide to end the campaign before it reaches size limit, then reset the rank for t2
                # has to do this in two steps since add_max_equality is not compatible with only_enforce_if
                model.add_max_equality(
                    max_values[m, t1, t2],
                    [0, var_machine_task_rank[m, t1] + 1 - var_m_t_reach_campaign_end[m, t1]*campaign_size]
                )
                model.add(var_machine_task_rank[m, t2] == max_values[m, t1, t2]).only_enforce_if(literals[m, t1, t2])

        model.add_circuit(arcs)

    return model




# This takes 16 seconds
model = create_model(5, 20, 3, 1)

# solver = cp_model.CpSolver()
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(status)
# print(solver.objective_value())
# #10
# print(total_time, status)

phases = [
    {'phase_id': 0, 'max_time': 0.5},
    {'phase_id': 1, 'max_time': 1},
    {'phase_id': 2, 'max_time': 2},
    {'phase_id': 3, 'max_time': 4},
    {'phase_id': 4, 'max_time': 8}
]

phases = [
    {'phase_id': 0, 'max_time': 10},
    {'phase_id': 1, 'max_time': 10},
    {'phase_id': 2, 'max_time': 10},
    {'phase_id': 3, 'max_time': 10},
    {'phase_id': 4, 'max_time': 10}
]

phases = [
    {'phase_id': 0, 'max_time': 5},
    {'phase_id': 1, 'max_time': 10},
    {'phase_id': 2, 'max_time': 15},
    {'phase_id': 3, 'max_time': 20},
    {'phase_id': 4, 'max_time': 25}
]

n = 6

phases = [
    {'phase_id': i, 'max_time': (i+1)*10} for i in range(n)
]


def get_solutions(model, solver):
    vars_sol = {}
    for i, var in enumerate(model.proto().variables):
        value = solver.response_proto().solution[i]
        vars_sol[var.name] = value
    return vars_sol


def add_hints(model, solution):
    for i, var in enumerate(model.proto().variables):
        if var.name in solution:
            model.proto().solution_hint.vars.append(i)
            model.proto().solution_hint.values.append(solution[var.name])

# get_solutions(model, solver)

obj_list = []

for phase in phases:
    phase_id, max_time = phase['phase_id'], phase['max_time']
    print('----------------------------')
    model.clear_hints()
    if phase_id == 0:
        solver = cp_model.CpSolver()
    if phase_id > 0 and 'solution' in locals():
        print("Add hints")
        add_hints(model, solution)
    print('number of variables in solution hints:', len(model.proto().solution_hint.vars))
    #solver.parameters.keep_all_feasible_solutions_in_presolve = True
    solver.parameters.max_time_in_seconds = max_time
    start = time()
    status = solver.solve(model=model)
    print('number of solutions:', solver.response_proto().solution)

    if status == 1 or status == 3:
        print(f'error status : {status}')
        break
    if status == 0:
        print(f'Cannot find a feasible solution in the given time {max_time}. status:{status}')
        obj_list.append(np.nan)
        continue

    obj_value = solver.objective_value()
    obj_list.append(obj_value)
    solution = get_solutions(model, solver)
    total_time = time() - start
    print(f"phase_id: {phase_id}, max_time: {max_time}, status: {status}, obj: {obj_value}. total time: {round(total_time,1)}")

    if status == 4:
        print('======================================================')
        print('Optimal Solution Achieved ! No need to continue')
        break

print(obj_list)

times = [(i+1)*5 for i in range(n)]


ax = plt.figure().gca()
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.plot(times, obj_list, marker='o')
plt.legend()
plt.title(f'time vs obj')
plt.xlabel('running seconds')
plt.ylabel('obj')
plt.show()

# self.alg_data, self.model = solver.solve(
#     alg_data=self.alg_data,
#     previous_model=self.model,
#     model_parameters=self.model_parameters,
# )


#
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
#     value = solver.response_proto().solution[i]
#     vars_sol[var.name] = value
#
#
#
# solver = cp_model.CpSolver()
# solver.parameters.max_time_in_seconds = 0.5
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# #model.ExportToFile("C:/Temp/xxxx.pb.txt")
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# start = time()
# status = solver.solve(model=model)
# total_time = time() - start
# print(total_time, status)
#
# vars_sol = {}
# for i, var in enumerate(model.proto().variables):
#     print(var)
#     print(var.name)
#     print(var.domain)
#     print('-------------------')
#     vars_sol[var.name] = var
#
# model.proto().solution_hint
# for i, var in enumerate(model.proto().variables):
#     if var.name in vars_sol:
#         model.proto().solution_hint.vars.append(i)
#         model.proto().solution_hint.values.append(vars_sol[var.name])
# #
#
# print(model.ModelStats())


# @dataclass
# class PhaseParameters:
#     """  Gather different input optimisation parameters that can be set for each phase. """
#
#     obj_phase: str = None
#     max_time_in_seconds: int = None
#     solution_count_limit: int = None
#     restart: bool = False
#
#
# phase_1 = PhaseParameters('phase_1', 10)
# phase_2 = PhaseParameters('phase_2', 10)
#
#


# show the result if getting the optimal one