Conditional duration via linear domain
Source: scheduling/example_33_conditional_duration_linear_domain.py
What it does
A task's duration depends on whether its start falls in a break slot.
- Two domains are built:
domain_break(start values that overlap a break) anddomain_no_break(safe start values). - A reified bool
var_task_overlap_break[t]toggles which domain the start must belong to, viaadd_linear_expression_in_domain. - Under
only_enforce_if, the task duration is eitherprocessing_timeorprocessing_time + 1. - The interval is built from start/duration/end, so everything downstream (no-overlap, cumulative) sees the correct effective duration.
Concepts
- Breaks (conditional duration)
- CP-SAT basics (
add_linear_expression_in_domain) - Interval variables
Source
from ortools.sat.python import cp_model
from time import time
import pandas as pd
if __name__ == '__main__':
"""
Offset = 0:
| x | | | x | | | x | | |
Offset = 1:
| | x | | | x | | | x | |
Offset = 2:
| | | x | | | x | | | x |
where x represent a unit duration break period
"""
break_offset = 1
num_of_tasks = 3
max_time = num_of_tasks*3
processing_time = 2
if break_offset == 0:
help_text = "| x | | | x | | | x | | |"
elif break_offset == 1:
help_text = "| | x | | | x | | | x | |"
elif break_offset == 2:
help_text = "| | | x | | | x | | | x |"
else:
print("offset wrong")
exit()
breaks = [(i*num_of_tasks + break_offset, i*num_of_tasks + break_offset + 1) for i in range(num_of_tasks)]
tasks = {x for x in range(num_of_tasks)}
starts_no_break = [x*3+break_offset+1 for x in range(-1, num_of_tasks) if x*3+break_offset+1>= 0]
starts_break = list(set(range(max_time)).difference(starts_no_break))
domain_no_break = cp_model.Domain.from_intervals([[x] for x in starts_no_break])
domain_break = cp_model.Domain.from_intervals([[x] for x in starts_break])
model = cp_model.CpModel()
var_task_starts = {task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks}
var_task_ends = {task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks}
var_task_durations = {task: model.new_int_var(2, 3, f"task_{task}_end") for task in tasks}
var_task_overlap_break = {task: model.new_bool_var(f"task_{task}_overlap_a_break") for task in tasks}
# print("Heuristic 1: Apply the tasks sequence heuristics")
for task in tasks:
if task == 0:
continue
model.add(var_task_ends[task-1] <= var_task_starts[task])
for task in tasks:
model.add_linear_expression_in_domain(var_task_starts[task], domain_break).only_enforce_if(
var_task_overlap_break[task]
)
model.add_linear_expression_in_domain(var_task_starts[task], domain_no_break).only_enforce_if(
~var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time+1).only_enforce_if(
var_task_overlap_break[task]
)
model.add(var_task_durations[task] == processing_time).only_enforce_if(
~var_task_overlap_break[task]
)
# intervals
var_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_durations[task],
var_task_ends[task],
name=f"interval_{task}"
)
for task in tasks
}
model.add_no_overlap(var_intervals.values())
# Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span + sum(var_task_durations.values()))
# model.minimize(sum(var_task_durations))
solver = cp_model.CpSolver()
start = time()
status = solver.solve(model=model)
total_time = time() - start
print_result = True
# show the result if getting the optimal one
if print_result:
print("-----------------------------------------")
print(help_text)
print("breaks periods:", breaks)
print("break starts:", starts_break)
print("no break starts:", starts_no_break)
if status == cp_model.OPTIMAL:
big_list = []
for task in tasks:
tmp = [
f"task {task}",
solver.value(var_task_starts[task]),
solver.value(var_task_ends[task]),
solver.value(var_task_overlap_break[task]),
solver.value(var_task_durations[task]),
]
big_list.append(tmp)
df = pd.DataFrame(big_list)
df.columns = ['task', 'start', 'end', 'overlap_break', 'duration']
df = df.sort_values(['start'])
print(df)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)