One automatic job
Source: scheduling/example_12_an_automatic_job.py
What it does
Models an "automatic" task: one that consumes the operator only for its first time unit (setup), after which the machine runs on its own.
-
var_task_intervals[t]is the full task interval (setup + auto run). -
var_task_intervals_autojobs[t]is a size-1 interval at(start, start + 1), representing only the setup portion. -
Breaks are added as fixed intervals. The cumulative constraint uses the setup intervals and the breaks:
model.add_cumulative( intervals=setup_intervals + break_intervals, demands=[1] * (len(tasks) + len(breaks)), capacity=1, )
So breaks push task starts around but cannot preempt an already-running automatic task.
Concepts
- Breaks (automatic jobs)
- Resources and cumulative
Source
from ortools.sat.python import cp_model
# Initiate
model = cp_model.CpModel()
'''
task product type
1 A TYPE_3
'''
# 1. Data
tasks = {1}
products = {'A', 'B'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_3'}
processing_time = {'A': 3, 'B': 1}
max_time = 10
breaks = {(0, 1), (2, 10)}
# 2. Decision Variables
var_task_starts = {
task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}
var_task_ends = {
task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}
var_task_intervals = {
task: model.new_interval_var(
var_task_starts[task],
processing_time[task_to_product[task]],
var_task_ends[task],
name=f"interval_t{task}"
)
for task in tasks
}
var_task_intervals_autojobs = {
task: model.new_interval_var(
var_task_starts[task],
1,
var_task_starts[task] + 1,
name=f"interval_t{task}"
)
for task in tasks
if task_to_type[task] == 'TYPE_3'
}
# Add break time
variables_breaks = {
(start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
for (start, end) in breaks
}
intervals = list(var_task_intervals_autojobs.values()) + list(variables_breaks.values())
# task, resource reduction for breaks
demands = [1] + [1]*len(breaks)
model.add_cumulative(intervals=intervals, demands=demands, capacity=1)
# 3. Objectives
make_span = model.new_int_var(0, max_time, "make_span")
model.add_max_equality(
make_span,
[var_task_ends[task] for task in tasks]
)
model.minimize(make_span)
# 4. Solve
solver = cp_model.CpSolver()
status = solver.solve(model=model)
# 5. Results
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
print('=========================== TASKS SUMMARY ===========================')
for task in tasks:
print(f'Task {task} ',
solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
)
print('Make-span:', solver.value(make_span))
elif status == cp_model.INFEASIBLE:
print("Infeasible")
elif status == cp_model.MODEL_INVALID:
print("Model invalid")
else:
print(status)