Conditional duration via linear domain
Source: scheduling/example_33_conditional_duration_linear_domain.py
Combines the ideas of two earlier chapters. Like Task delaying a break, a task that hits a break takes longer. Like Linear domain for breaks, the break pattern is periodic enough to encode as start-time domains.
Two start domains are precomputed: domain_break (starts that would
overlap a break) and domain_no_break (safe starts). A reified bool
overlap_break[t] toggles which domain the start belongs to, and the
duration switches by one time unit accordingly:
model.add(duration[t] == processing_time + 1).only_enforce_if(overlap_break[t])
model.add(duration[t] == processing_time ).only_enforce_if(~overlap_break[t])
Because the task interval is built from start/duration/end, everything downstream - no-overlap, cumulative, makespan - sees the correct effective duration with no extra bookkeeping.
Concepts
- Breaks (conditional duration)
- CP-SAT basics (
add_linear_expression_in_domain) - Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
break_offset = 1
num_of_tasks = 3
max_time = num_of_tasks*3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
# print("Heuristic 1: Apply the tasks sequence heuristics")
for task in tasks:
if task == 0:
continue
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
~var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time).only_enforce_if(
~var_task_overlap_break[task]
)
# intervals
var_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_durations[task],
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
model.add_no_overlap(var_intervals.values())
# Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span + sum(var_task_durations.values()))
# model.minimize(sum(var_task_durations))
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print_result = True
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_overlap_break[task]),
solver.value(var_task_durations[task]),
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)