Simple sequence

Source: scheduling/example_01_simple_sequence.py

What it does

The smallest end-to-end scheduling model in the book. Three tasks across two products on a single machine.

  • A dummy task 0 acts as the start/end node.
  • seq[t1, t2] booleans select the order via add_circuit.
  • If t1 -> t2 is chosen, end[t1] <= start[t2] is enforced.
  • The changeover cost lives in the objective: Minimize(sum(seq * cost)).
  • Duration is encoded manually with end - start == duration.

Concepts

Source

# circuit arcs
# tuples
# add arc : arcs.append([job1_index, job2_index, binary])                             |
# add link: Use only_enforce_if for  binary <--->  job1.start_time <= job2.end_time     | -> three things connected

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

M = 99999

# 1. Data
'''
task   product
1       A
2       B
3       A
'''
tasks = [0, 1, 2, 3]
task_to_product = {0: 'dummy', 1: 'A', 2: 'B', 3: 'A'}
processing_time = {'dummy': 0, 'A': 1, 'B': 1}
changeover_time = {'dummy': 0, 'A': 1, 'B': 1}

m = {
    (t1, t2)
    for t1 in tasks
    for t2 in tasks
    if t1 != t2
}

m_cost = {
    (t1, t2): 0
    if task_to_product[t1] == task_to_product[t2] or task_to_product[t1] == 'dummy' or task_to_product[t2] == 'dummy'
    else changeover_time[task_to_product[t2]]
    for (t1, t2) in m
}


# 2. Decision variables
'''
(1, 2): 0/1
(1, 3)
(2, 1)
(2, 3)
(3, 1)
(3, 2)
'''

max_time = 8

variables_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

variables_sequence = {
    (t1, t2): model.new_bool_var(f"task {t1} --> task {t2}")
    for (t1, t2) in m
}

# 3. Objectives

total_changeover_time = model.new_int_var(0, max_time, "total_changeover_time")

total_changeover_time = sum(
    [variables_sequence[(t1, t2)]*m_cost[(t1, t2)] for (t1, t2) in m]
)

model.minimize(total_changeover_time)
#model.maximize(total_changeover_time)


# 4. Constraints

# Add duration

for task in tasks:
    model.add(
        variables_task_ends[task] - variables_task_starts[task] == processing_time[task_to_product[task]]
    )

# add_circuits

arcs = list()
'''
arcs.append([0, 1, model.new_bool_var("dummy0" + "_to_1")])
arcs.append([0, 2, model.new_bool_var("dummy0" + "_to_2")])
arcs.append([0, 3, model.new_bool_var("dummy0" + "_to_3")])
arcs.append([1, 0, model.new_bool_var("1_to_" + "dummy0")])
arcs.append([2, 0, model.new_bool_var("2_to_" + "dummy0")])
arcs.append([3, 0, model.new_bool_var("3_to_" + "dummy0")])
'''
for (from_task, to_task) in m:
    arcs.append(
        [
            from_task,
            to_task,
            variables_sequence[(from_task, to_task)]
        ]
    )

    if from_task != 0 and to_task != 0:
        model.add(
            variables_task_ends[from_task] <= variables_task_starts[to_task]
        ).only_enforce_if(variables_sequence[(from_task, to_task)])

model.add_circuit(arcs)

#model.add(variables_task_starts[0] == 8)

# Solve

# https://github.com/d-krupke/cpsat-primer
#model.add_decision_strategy(variables_task_starts, cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MIN_VALUE)
#model.add_decision_strategy([x[1] for x in variables_task_starts.items()], cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)


solver = cp_model.CpSolver()
status = solver.solve(model=model)

# Post-process

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    for task in tasks:
        print(f'Task {task} ',
              solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task])
              )

    for (t1, t2) in m:
        print(f'{t1} --> {t2}:   {solver.value(variables_sequence[(t1, t2)])}')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")