Task delaying a break

Source: scheduling/example_14_task_delaying_break.py

What it does

When a task runs across a break, its total machine time grows by the length of the break. This is encoded per time slot:

  • var_task_duration_timeslots[t, i] = 1 iff task t occupies slot i. Built from two reified booleans, "starts before i" AND "ends after i", combined with add_multiplication_equality.
  • var_task_new_duration[t] = base + sum(is_break[i] * uses[t, i] for i) computes the stretched duration.
  • The task interval uses this stretched duration directly.
var_task_intervals[t] = model.new_interval_var(
    start[t], new_duration[t], end[t], ...,
)

Break intervals are added to add_cumulative alongside the task start intervals (since breaks do not preempt an auto-type task once started).

Concepts

  • Breaks (approach 2: duration stretched)
  • CP-SAT basics (reification, add_multiplication_equality)

Source

from ortools.sat.python import cp_model

# Initiate
model = cp_model.CpModel()

'''
task    product     type
1       A           TYPE_4
2       B           TYPE_4
'''

# 1. Data

tasks = {1}
products = {'A'}
task_to_product = {1: 'A'}
task_to_type = {1: 'TYPE_4'}
processing_time = {'A': 3}
max_time = 10
breaks = {(2, 4)}
is_break = {i: 1 if 2<=i<4 else 0 for i in range(max_time)}

# 2. Decision Variables

var_task_starts = {
    task: model.new_int_var(0, max_time, f"task_{task}_start") for task in tasks
}

var_task_ends = {
    task: model.new_int_var(0, max_time, f"task_{task}_end") for task in tasks
}

var_task_new_duration = {
    task: model.new_int_var(0, max_time, f"task_{task}_delay") for task in tasks
}

var_task_duration_timeslots = {
    (task, i): model.new_bool_var(f'task {task} uses interval {i}')
    for task in tasks
    for i in range(max_time)
}

## four
# AND pair
var_task_start_earlier_than_start = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
var_task_end_later_than_end = {
    (task, i): model.new_bool_var(f'')
    for task in tasks
    for i in range(max_time)
}
# # OR pair
# var_task_start_later_than_end = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }
# var_task_end_earlier_than_start = {
#     (task, i): model.new_bool_var(f'')
#     for task in tasks
#     for i in range(max_time)
# }


for task in tasks:
    for i in range(max_time):
        model.add(var_task_starts[task] <= i).only_enforce_if(var_task_start_earlier_than_start[task, i])
        model.add(var_task_starts[task] > i).only_enforce_if(~var_task_start_earlier_than_start[task, i])

        model.add(var_task_ends[task] >= i + 1).only_enforce_if(var_task_end_later_than_end[task, i])
        model.add(var_task_ends[task] < i + 1).only_enforce_if(~var_task_end_later_than_end[task, i])

        # model.add(var_task_starts[task] >= i+1).only_enforce_if(var_task_start_later_than_end[task, i])
        # model.add(var_task_starts[task] < i+1).only_enforce_if(~var_task_start_later_than_end[task, i])
        #
        # model.add(var_task_ends[task] <= i).only_enforce_if(var_task_end_earlier_than_start[task, i])
        # model.add(var_task_ends[task] > i).only_enforce_if(~var_task_end_earlier_than_start[task, i])

        # And
        model.add_multiplication_equality(
            var_task_duration_timeslots[task, i],
            [var_task_start_earlier_than_start[task, i], var_task_end_later_than_end[task, i]]
        )

        # model.add_min_equality(
        #     var_task_duration_timeslots[task, i],
        #     [var_task_start_later_than_end[task, i], var_task_end_earlier_than_start[task, i]]
        # )

# for task in tasks:
#     for i in range(max_time):
#         start_index = i
#         end_index = i + 1
#
#         # TRUE
#         model.add_linear_constraint(var_task_starts[task], 0, start_index).only_enforce_if(var_task_timeslots[task, i])
#         model.add_linear_constraint(var_task_ends[task], end_index, max_time).only_enforce_if(var_task_timeslots[task, i])
#
#         # FALSE
#         if i == 0:
#             # first time slot
#             model.add_linear_expression_in_domain(
#             var_task_starts[task],
#             cp_model.Domain.from_intervals([[1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#
#         elif i == max_time - 1:
#             # last time slot
#             model.add_linear_expression_in_domain(
#             var_task_ends[task],
#             cp_model.Domain.from_intervals([[0, max_time-1]])
#             ).only_enforce_if(~var_task_timeslots[task, i])
#         else:
#             model.add_linear_expression_in_domain(
#             x,
#             cp_model.Domain.from_intervals([[1, start_index], [end_index+1, max_time]])
#             ).only_enforce_if(~var_task_timeslots[task, i])


#
# task_1_start  <  task_2_start <  task_1_end
# Variables:
# 1: Overlap
# 2: (task_2_start < task_1_end)
# 3: (task_1_start < task_2_start)
# Constraints:
# 1: (task_1_start < task_2_start) -> task_1_start
# 2:


for task in tasks:
    model.add(
            var_task_new_duration[task] == processing_time[task_to_product[task]] +
            sum(is_break[i]*var_task_duration_timeslots[task, i] for i in range(max_time))
    )


var_task_intervals = {
    task: model.new_interval_var(
        var_task_starts[task],
        var_task_new_duration[task],
        #processing_time[task_to_product[task]],
        var_task_ends[task],
        name=f"interval_t{task}"
    )
    for task in tasks
}


var_task_intervals_auto = {
    task: model.new_interval_var(
        var_task_starts[task],
        1,
        var_task_starts[task] + 1,
        name=f"interval_auto_t{task}"
    )
    for task in tasks
    if task_to_type[task] == 'TYPE_4'
}

# Add break time
variables_breaks = {
    (start, end): model.new_fixed_size_interval_var(start=start, size=end-start, name='a_break')
    for (start, end) in breaks
}

intervals = list(var_task_intervals_auto.values()) + list(variables_breaks.values())

# task, resource reduction for breaks
demands = [1]*len(tasks) + [1]*len(breaks)

model.add_cumulative(intervals=intervals, demands=demands, capacity=1)

# 3. Objectives

make_span = model.new_int_var(0, max_time, "make_span")

model.add_max_equality(
    make_span,
    [var_task_ends[task] for task in tasks]
)

model.minimize(make_span)




# 4. Solve

solver = cp_model.CpSolver()
status = solver.solve(model=model)


# 5. Results

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:

    for task in tasks:
        print(f'task {task}')
        for i in range(max_time):
            print(f'{i} {solver.value(var_task_duration_timeslots[task, i])}')

    print('===========================  TASKS SUMMARY  ===========================')
    for task in tasks:
        print(f'Task {task} ',
              solver.value(var_task_starts[task]), solver.value(var_task_ends[task]),
              )
        print(solver.value(var_task_new_duration[task]))

    print('Make-span:', solver.value(make_span))
    # print('=======================  ALLOCATION & SEQUENCE  =======================')
    # if True:
    #     for t1 in tasks:
    #         for t2 in tasks:
    #             if t1 != t2:
    #                 value = solver.value(var_task_seq[(t1, t2)])
    #                 print(f'{t1} --> {t2}  {value}')
    #                 # if value == 1 and t2 != 0:
    #                 #     print(f'{t1} --> {t2}   {task_to_product[t1]} >> {task_to_product[t2]}')#  cost: {m_cost[m, t1, t2]}')
    #                 # if value == 1 and t2 == 0:
    #                 #     print(f'{t1} --> {t2}   Closing')

elif status == cp_model.INFEASIBLE:
    print("Infeasible")
elif status == cp_model.MODEL_INVALID:
    print("Model invalid")
else:
    print(status)