Task delaying a break
Source: scheduling/example_14_task_delaying_break.py
What it does
When a task runs across a break, its total machine time grows by the length of the break. This is encoded per time slot:
var_task_duration_timeslots[t, i]= 1 iff tasktoccupies sloti. Built from two reified booleans, "starts before i" AND "ends after i", combined withadd_multiplication_equality.var_task_new_duration[t] = base + sum(is_break[i] * uses[t, i] for i)computes the stretched duration.- The task interval uses this stretched duration directly.
var_task_intervals[t] = model.new_interval_var(
start[t], new_duration[t], end[t], ...,
)
Break intervals are added to add_cumulative alongside the task start
intervals (since breaks do not preempt an auto-type task once started).
Concepts
- Breaks (approach 2: duration stretched)
- CP-SAT basics (reification,
add_multiplication_equality)
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_4
2 B TYPE_4
'''
# 1. Data
tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_new_duration = {
task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}
var_task_duration_timeslots = {
(task, i): model.new_bool_var(f'task {task} uses interval {i}')
for task in tasks
for i in range(max_time)
}
## four
# AND pair
var_task_start_earlier_than_start = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
var_task_end_later_than_end = {
(task, i): model.new_bool_var(f'')
for task in tasks
for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
# (task, i): model.new_bool_var(f'')
# for task in tasks
# for i in range(max_time)
# }
for task in tasks:
for i in range(max_time):
model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])
model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])
# model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
# model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
#
# model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
# model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])
# And
model.add_multiplication_equality(
var_task_duration_timeslots[task, i],
[var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
)
# model.add_min_equality(
# var_task_duration_timeslots[task, i],
# [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
# )
# for task in tasks:
# for i in range(max_time):
# start_index = i
# end_index = i + 1
#
# # TRUE
# model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
# model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
# # FALSE
# if i == 0:
# # first time slot
# model.add_linear_expression_in_domain(
# var_task_starts[task],
# cp_model.Domain.from_intervals([[1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# elif i == max_time - 1:
# # last time slot
# model.add_linear_expression_in_domain(
# var_task_ends[task],
# cp_model.Domain.from_intervals([[0, max_time-1]])
# ).only_enforce_if(~var_task_timeslots[task, i])
# else:
# model.add_linear_expression_in_domain(
# x,
# cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
# ).only_enforce_if(~var_task_timeslots[task, i])
#
# task_1_start < task_2_start < task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:
for task in tasks:
model.add(
var_task_new_duration[task] == processing_time[task_to_product[task]] +
sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
)
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
var_task_new_duration[task],
#processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_auto = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_auto_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_4'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
for task in tasks:
print(f'task {task}')
for i in range(max_time):
print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print(solver.value(var_task_new_duration[task]))
print('Make-span:', solver.value(make_span))
# print('======================= ALLOCATION & SEQUENCE =======================')
# if True:
# for t1 in tasks:
# for t2 in tasks:
# if t1 != t2:
# value = solver.value(var_task_seq[(t1, t2)])
# print(f'{t1} --> {t2} {value}')
# # if value == 1 and t2 != 0:
# # print(f'{t1} --> {t2} {task_to_product[t1]} >> {task_to_product[t2]}')# cost: {m_cost[m, t1, t2]}')
# # if value == 1 and t2 == 0:
# # print(f'{t1} --> {t2} Closing')
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)