People mode
Source: scheduling/example_10_people_mode.py
What it does
Each task can run in one of several "people modes", with different processing times and headcount requirements. Only the mode choice changes the model; sequencing is similar to 03.
-
variables_task_resource_mode[t, k]: one-hot bool for tasktselecting modek. Exactly one mode per task. -
variables_task_processing_time[t]is derived from the mode:model.add( processing_time_var[t] == sum( processing_time[product[t], k] * mode[t, k] for k in modes ) ) -
Standard machine-assignment variables (
presence[m, t],start[m, t],end[m, t]), per-machineadd_circuit, and task-level links complete the model.
Concepts
- Resources and cumulative (resource modes)
- Circuit and sequencing
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product
1 A
2 A
'''
### 1. Data
tasks = {1, 2}
products = {'A'}
task_to_product = {1: 'A', 2: 'A'}
machines = {1,2}
resource_modes = {1, 2}
# product -> people mode -> time duration
processing_time = {
('A', 1): 3,
('A', 2): 2
}
resource_requirement = {
('A', 1): 2,
('A', 2): 3
}
max_time = 20
# (task, people_mode)
# 2. Decision variables
variables_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
variables_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
variables_machine_task_presences = {
(m, t): model.new_bool_var(f"presence_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_starts = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
variables_machine_task_ends = {
(m, t): model.new_int_var(0, max_time, f"start_{m}_{t}")
for t in tasks
for m in machines
}
# One task to one machine.
for task in tasks:
task_candidate_machines = machines
tmp = [
variables_machine_task_presences[m, task]
for m in task_candidate_machines
]
# this task is only present in one machine
model.add_exactly_one(tmp)
# task level link to machine-task level
for task in tasks:
task_candidate_machines = machines
for m in task_candidate_machines:
model.add(
variables_task_starts[task] == variables_machine_task_starts[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
model.add(
variables_task_ends[task] == variables_machine_task_ends[m, task]
).only_enforce_if(variables_machine_task_presences[m, task])
# for sequence control
variables_machine_task_sequence = {
(m, t1, t2): model.new_bool_var(f"Machine {m} task {t1} --> task {t2}")
for t1 in tasks
for t2 in tasks
for m in machines
if t1 != t2
}
# Sequence
for m in machines:
arcs = list()
for node1, t1 in enumerate(tasks):
tmp_1 = model.new_bool_var(f'm_{m}_first_to_{t1}')
arcs.append([0, t1, tmp_1])
tmp_2 = model.new_bool_var(f'm_{m}_{t1}_to_last')
arcs.append([t1, 0, tmp_2])
arcs.append([t1, t1, ~variables_machine_task_presences[m, t1]])
for node_2, t2 in enumerate(tasks):
# arcs
if t1 == t2:
continue
arcs.append([
t1,
t2,
variables_machine_task_sequence[(m, t1, t2)]
])
distance = 0
# cannot require the time index of task 0 to represent the first and the last position
model.add(
variables_task_ends[t1] + distance <= variables_task_starts[t2]
).only_enforce_if(variables_machine_task_sequence[(m, t1, t2)])
model.add_circuit(arcs)
#
variables_task_resource_mode = {
(task, resource_mode): model.new_bool_var(f"task {task} using resource mode {resource_mode}")
for task in tasks for resource_mode in resource_modes
}
# variables_task_resource_modes = {
# task: model.new_int_var(0, len(resource_modes), f"the resource mode of {task}")
# for task in tasks
# }
# one task has to pick one resource mode
for task in tasks:
tmp = sum(variables_task_resource_mode[task, resource_mode] for resource_mode in resource_modes)
model.add(tmp == 1)
# link resource id with decision matrix
# for task in tasks:
# for resource_mode in resource_modes:
# model.add(
# variables_task_resource_modes[task] == resource_mode
# ).only_enforce_if(
# variables_task_resource_mode_matrix[task, resource_mode]
# )
variables_task_processing_time = {
task: model.new_int_var(0, 99, f"processing time for {task}")
for task in tasks
}
for task in tasks:
model.add(
variables_task_processing_time[task] == sum(
processing_time[task_to_product[task], resource_mode]*variables_task_resource_mode[task, resource_mode]
for resource_mode in resource_modes
)
)
# model.new_optional_interval_var(1,1,2,1,'')
# model.new_optional_interval_var(
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# model.new_int_var(0,1,''),
# model.new_int_var(0, 1, ''),
# '')
# variables_machine_task_intervals = {
# (m, task): model.new_optional_interval_var(
# variables_machine_task_starts[m, task],
# #2,
# variables_task_processing_time[task],
# #processing_time[task_to_product[task], variables_task_resource_modes[task]],
# variables_machine_task_ends[m, task],
# variables_machine_task_presences[m, task],
# name=f"t_interval_{m}_{task}"
# )
# for task in tasks
# for m in machines
# }
variables_machine_task_resource_mode_presence = {
(m, task, resource_mode): model.new_bool_var(f'm{m}_task_{task}_resource_mode_{resource_mode}_presence')
for m in machines
for task in tasks
for resource_mode in resource_modes
}
for task in tasks:
for resource_mode in resource_modes:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for m in machines) == 1
).only_enforce_if(
variables_task_resource_mode[task, resource_mode]
)
for task in tasks:
for m in machines:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode] for resource_mode in resource_modes) == 1
).only_enforce_if(
variables_machine_task_presences[m, task]
)
for task in tasks:
model.add(
sum(variables_machine_task_resource_mode_presence[m, task, resource_mode]
for resource_mode in resource_modes
for m in machines
) == 1
)
variables_machine_task_mode_intervals = {
(m, task, resource_mode): model.new_optional_interval_var(
variables_machine_task_starts[m, task],
#2,
variables_task_processing_time[task],
#processing_time[task_to_product[task], variables_task_resource_modes[task]],
variables_machine_task_ends[m, task],
variables_machine_task_resource_mode_presence[m, task, resource_mode],
#variables_machine_task_presences[m, task]*variables_task_resource_mode[task, resource_mode],
name=f"int_m{m}_t{task}_mode{resource_mode}"
)
for task in tasks
for m in machines
for resource_mode in resource_modes
}
intervals = [x for x in variables_machine_task_mode_intervals.values()]
demands = [
resource_requirement[task_to_product[task], resource_mode]
for machine, task, resource_mode in variables_machine_task_mode_intervals.keys()
]
model.add_cumulative(intervals=intervals, demands=demands, capacity=7)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[variables_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# for m in machines:
# for task in tasks:
# for resource_mode in resource_modes:
# print(f'{m} {task} {resource_mode}')
# print(solver.value(variables_machine_task_mode_intervals[m, task, resource_mode]))
# Post-process
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for m in machines:
for task in tasks:
for resource_mode in resource_modes:
print(f'M {m} T {task} R {resource_mode}' ,end = ' ')
print(f' {solver.value(variables_machine_task_resource_mode_presence[m, task, resource_mode])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(variables_task_starts[task]), solver.value(variables_task_ends[task]), end=' '
)
for resource_mode in resource_modes:
value = solver.value(variables_task_resource_mode[task, resource_mode])
if value == 1:
print(f'Using resource mode {resource_mode}')
print('Make-span:', solver.value(make_span))
print('======================= ALLOCATION & SEQUENCE =======================')
for m in machines:
print(f'------------\nMachine {m}')
for t1 in tasks:
value = solver.value(variables_machine_task_presences[(m, t1)])
if value == 1:
print(f'task_{t1}_on machine_{m}')
for t2 in tasks:
if t1 != t2:
value = solver.value(variables_machine_task_sequence[(m, t1, t2)])
if value == 1 and t2 != 0:
print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
if value == 1 and t2 == 0:
print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")