Skip to content

Commit b8b2acb

Browse files
committed
Generic Task Moduel:
- StageActions: This method stages the action_sequence for a proper execution order (in the task initialization, it is called if an action sequence is not provided). Currently, it takes an argument 'from_deps' to compute the sequence based on the inter-action dependencies but the idea is to enable other strategies later to be built to create the action_sequence in the Task for more autonomy. - calcDuration - This organizes the actions to be done by this task into the proper order based on the action_sequence. This is used to fill out the self.actions_ti (the initial time of individual actions within the task), self.ti, and self.tf. Right now, when we're initializing the task, we assume a ti of zero. - calcCost - This calculate the total cost of the task based on the costs of individual actions (a simple roll-up). This updates self.cost. - Both calcDuration and calcCost assumes that the individual action.duration and action.cost have already been computed (using calcCostandDuration in the action class but I'm also thinking of dividing this method into two different methods - one for cost and one for duration to match with Task.py and also task-assets matrix in the scheduler. - updateTaskTime: This method updates the starting and finishing time of the whole task and the starting time of individual actions based on a new start time passed to Task (could be helpful later during task scheduling using the sceduler.
1 parent 96c8ffb commit b8b2acb

File tree

2 files changed

+108
-216
lines changed

2 files changed

+108
-216
lines changed

famodel/irma/calwave_task1.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from famodel.project import Project
88
from calwave_irma import Scenario
99
import calwave_chart as chart
10-
from calwave_task import Task
10+
# from calwave_task import Task # calwave_task module (Felipe)
11+
from task import Task as Task # generic Task module ( Rudy )
1112

1213
sc = Scenario() # now sc exists in *this* session
1314

@@ -183,21 +184,22 @@ def assign_actions(sc: Scenario, actions: dict):
183184
# 4) Assign (assign vessels/roles)
184185
assign_actions(sc, actions)
185186

186-
# 5) schedule once, in the Task
187-
calwave_task1 = Task.from_scenario(
188-
sc,
189-
name='calwave_task1',
190-
strategy='earliest', # 'earliest' or 'levels'
191-
enforce_resources=False, # keep single-resource blocking if you want it
192-
resource_roles=('vessel', 'carrier', 'operator'))
193-
194-
# 6) Extract Task1 sequencing info
195-
# calwave_task1.extractSeqYaml()
187+
# # 5) schedule once, in the Task
188+
# calwave_task1 = Task.from_scenario(
189+
# sc,
190+
# name='calwave_task1',
191+
# strategy='earliest', # 'earliest' or 'levels'
192+
# enforce_resources=False, # keep single-resource blocking if you want it
193+
# resource_roles=('vessel', 'carrier', 'operator'))
194+
195+
196+
# 5) Build Task
197+
task1 = Task(name='calwave_task1', actions=sc.actions)
198+
199+
# task1.updateTaskTime(newStart=10)
196200

197-
# 7) update Task1 if needed
198-
calwave_task1.update_from_SeqYaml() # uncomment to re-apply sequencing from YAML
199-
# 8) build the chart input directly from the Task and plot
200-
chart_view = chart.view_from_task(calwave_task1, sc, title='CalWave Task 1 - Anchor installation plan')
201+
# 6) build the chart input directly from the Task and plot #TODO: Rudy / Improve this later (maybe include it in Task.py/Scenario and let it plot the absolute time instead of relative time)
202+
chart_view = chart.view_from_task(task1, sc, title='CalWave Task 1 - Anchor installation plan')
201203
chart.plot_task(chart_view, outpath='calwave_task1_chart.png')
202204

203205

famodel/irma/task.py

Lines changed: 91 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -32,250 +32,140 @@ class Task():
3232
3333
'''
3434

35-
def __init__(self, actions, action_sequence, name, **kwargs):
35+
def __init__(self, name, actions, action_sequence=None, **kwargs):
3636
'''Create an action object...
3737
It must be given a name and a list of actions.
3838
The action list should be by default coherent with actionTypes dictionary.
3939
4040
Parameters
4141
----------
42+
name : string
43+
A name for the action. It may be appended with numbers if there
44+
are duplicate names.
4245
actions : list
4346
A list of all actions that are part of this task.
44-
action_sequence : dict
47+
action_sequence : dict, optional
4548
A dictionary where each key is the name of each action, and the values are
4649
each a list of which actions (by name) must be completed before the current
47-
one.
48-
name : string
49-
A name for the action. It may be appended with numbers if there
50-
are duplicate names.
50+
one. If None, the action_sequence will be built by calling self.stageActions(from_deps=True)
51+
[building from the dependencies of each action].
5152
kwargs
5253
Additional arguments may depend on the task type.
5354
5455
'''
55-
56-
# Make a dict by name of all actions that are carried out in this task
57-
self.actions = {}
58-
for act in actions:
59-
self.actions[act.name] = act
56+
6057

6158

6259

6360
self.name = name
64-
61+
self.actions = {a.name: a for a in actions.values()}
62+
63+
if action_sequence is None:
64+
self.stageActions(from_deps=True)
65+
else:
66+
self.action_sequence = {k: list(v) for k, v in action_sequence.items()}
67+
68+
6569
self.status = 0 # 0, waiting; 1=running; 2=finished
6670
self.actions_ti = {} # relative start time of each action [h]
6771

68-
self.duration = 0 # duration must be calculated based on lengths of actions
69-
self.cost = 0 # cost must be calculated based on the cost of individual actions.
70-
self.ti =0 # task start time [h?]
71-
self.tf =0 # task end time [h?]
72-
73-
# what else do we need to initialize the task?
74-
75-
# Create a graph of the sequence of actions in this task based on action_sequence
76-
self.getSequenceGraph(action_sequence, plot=True) # this also updates duration
77-
78-
self.cost = sum(action.cost for action in self.actions.values())
72+
self.duration = 0.0 # duration must be calculated based on lengths of actions
73+
self.cost = 0.0 # cost must be calculated based on the cost of individual actions.
74+
self.ti = 0.0 # task start time [h?]
75+
self.tf = 0.0 # task end time [h?]
7976

80-
print(f"---------------------- Initializing Task '{self.name} ----------------------")
77+
# Calculate duration and cost
78+
self.calcDuration() # organizes actions and calculates duration
79+
self.calcCost()
80+
81+
print(f"---------------------- Initializing Task '{self.name} ----------------------")
8182
print(f"Task '{self.name}' initialized with duration = {self.duration:.2f} h.")
8283
print(f"Task '{self.name}' initialized with cost = ${self.cost:.2f} ")
8384

84-
def organizeActions(self):
85-
'''Organizes the actions to be done by this task into the proper order
86-
based on the strategy of this type of task...
85+
def stageActions(self, from_deps=True):
8786
'''
88-
89-
if self.type == 'parallel_anchor_install':
90-
91-
pass
92-
# make a graph that reflects this strategy?
93-
94-
95-
def calcDuration(self):
96-
'''Calculates the duration of the task based on the durations of the
97-
individual actions and their order of operation.'''
98-
99-
# Does Rudy have graph-based code that can do this?
100-
101-
102-
103-
104-
105-
106-
def getSequenceGraph(self, action_sequence, plot=True):
107-
'''Generate a multi-directed graph that visalizes action sequencing within the task.
108-
Build a MultiDiGraph with nodes:
109-
Start -> CP1 -> CP2 -> ... -> End
110-
111-
Checkpoints are computed from action "levels":
112-
level(a) = 1 if no prerequisites.
113-
level(a) = 1 + max(level(p) for p in prerequisites) 1 + the largest level among a’s prerequisites.
114-
Number of checkpoints = max(level) - 1.
115-
'''
116-
117-
# Compute levels
118-
levels: dict[str, int] = {}
119-
def level_of(a: str, b: set[str]) -> int:
120-
'''Return the level of action a. b is the set of actions currently being explored'''
87+
This method stages the action_sequence for a proper execution order.
12188
122-
# If we have already computed the level, return it
123-
if a in levels:
124-
return levels[a]
89+
Parameters
90+
----------
91+
from_deps : bool
92+
If True, builds the action_sequence from the dependencies of each action.
93+
More options will be added in the future.
94+
'''
95+
if from_deps:
96+
# build from dependencies
97+
def getDeps(action):
98+
deps = []
99+
for dep in action.dependencies:
100+
deps.append(dep)
101+
return deps
125102

126-
# The action cannot be its own prerequisite
127-
if a in b:
128-
raise ValueError(f"Cycle detected in action sequence at '{a}' in task '{self.name}'. The action cannot be its own prerequisite.")
129-
130-
b.add(a)
131-
132-
# Look up prerequisites for action a.
133-
pres = action_sequence.get(a, [])
134-
if not pres:
135-
lv = 1 # No prerequisites, level 1
136-
else:
137-
# If a prerequisites name is not in the dict, treat it as a root (level 1)
138-
lv = 1 + max(level_of(p, b) if p in action_sequence else 1 for p in pres)
103+
self.action_sequence = {self.actions[name].name: getDeps(self.actions[name]) for name in self.actions}
139104

140-
# b.remove(a) # if you want to unmark a from the explored dictionary, b, uncomment this line.
141-
levels[a] = lv
142-
return lv
143-
144-
for a in action_sequence:
145-
level_of(a, set())
146-
147-
max_level = max(levels.values(), default=1)
148-
num_cps = max(0, max_level - 1)
149-
150-
H = nx.MultiDiGraph()
151-
152-
# Add the Start -> [checkpoints] -> End nodes
153-
H.add_node("Start")
154-
for i in range(1, num_cps + 1):
155-
H.add_node(f"CP{i}")
156-
H.add_node("End")
157-
158-
shells = [["Start"]]
159-
if num_cps > 0:
160-
# Middle shells
161-
cps = [f"CP{i}" for i in range(1, num_cps + 1)]
162-
shells.append(cps)
163-
shells.append(["End"])
164-
165-
pos = nx.shell_layout(H, nlist=shells)
166-
167-
xmin, xmax = -2.0, 2.0 # maybe would need to change those later on.
168-
pos["Start"] = (xmin, 0)
169-
pos["End"] = (xmax, 0)
170-
171-
# Add action edges
172-
# Convention:
173-
# level 1 actions: Start -> CP1 (or Start -> End if no CPs)
174-
# level L actions (2 <= L < max_level): CP{L-1} -> CP{L}
175-
# level == max_level actions: CP{num_cps} -> End
176-
for action, lv in levels.items():
177-
action = self.actions[action]
178-
if num_cps == 0:
179-
# No checkpoints: all actions from Start to End
180-
H.add_edge("Start", "End", key=action, duration=action.duration, cost=action.cost)
181-
else:
182-
if lv == 1:
183-
H.add_edge("Start", "CP1", key=action, duration=action.duration, cost=action.cost)
184-
elif lv < max_level:
185-
H.add_edge(f"CP{lv-1}", f"CP{lv}", key=action, duration=action.duration, cost=action.cost)
186-
else: # lv == max_level
187-
H.add_edge(f"CP{num_cps}", "End", key=action, duration=action.duration, cost=action.cost)
188105

106+
def calcDuration(self):
107+
'''Organizes the actions to be done by this task into the proper order
108+
based on the action_sequence. This is used to fill out
109+
self.actions_ti, self.ti, and self.tf. This method assumes that action.duration
110+
have already been evaluated for each action in self.actions.
111+
'''
112+
# Initialize dictionaries to hold start and finish times
113+
starts = {}
114+
finishes = {}
189115

190-
# 3. Compute cumulative start time for each level
191-
level_groups = {}
192-
for action, lv in levels.items():
193-
level_groups.setdefault(lv, []).append(action)
194-
195-
level_durations = {lv: max(self.actions[a].duration for a in acts)
196-
for lv, acts in level_groups.items()}
116+
# Iterate through actions in the sequence
117+
for action, dep_actions in self.action_sequence.items():
118+
# Calculate start time as the max finish time of dependencies
119+
starts[action] = max((finishes[dep] for dep in dep_actions), default=0)
197120

198-
task_duration = sum(level_durations.values())
121+
# get duration from actions
122+
duration = self.actions[action].duration # in hours
199123

200-
level_start_time = {}
201-
elapsed = 0.0
202-
cp_string = []
203-
for lv in range(1, max_level + 1):
204-
level_start_time[lv] = elapsed
205-
elapsed += level_durations.get(lv, 0.0)
206-
# also collect all actions at this level for title
207-
acts = [a for a, l in levels.items() if l == lv]
208-
if acts and lv <= num_cps:
209-
cp_string.append(f"CP{lv}: {', '.join(acts)}")
210-
elif acts and lv > num_cps:
211-
cp_string.append(f"End: {', '.join(acts)}")
124+
# Calculate finish time
125+
finishes[action] = starts[action] + duration
212126

213-
# Assign to self:
214-
self.duration = task_duration
215-
self.actions_ti = {a: level_start_time[lv] for a, lv in levels.items()}
216-
self.sequence_graph = H
217-
218-
title_str = f"Task {self.name}. Duration {self.duration:.2f} : " + " | ".join(cp_string)
127+
# Update self.actions_ti with relative start times
128+
self.actions_ti = starts
219129

220-
if plot:
221-
fig, ax = plt.subplots()
222-
# pos = nx.shell_layout(G)
223-
nx.draw(H, pos, with_labels=True, node_size=500, node_color="lightblue", edge_color='white')
130+
# Set task start time and finish time
131+
self.ti = min(starts.values(), default=0)
132+
self.tf = max(finishes.values(), default=0)
224133

225-
label_positions = {} # to store label positions for each edge
226-
# Group edges by unique (u, v) pairs
227-
for (u, v) in set((u, v) for u, v, _ in H.edges(keys=True)):
228-
# get all edges between u and v (dict keyed by edge key)
229-
edge_dict = H.get_edge_data(u, v) # {key: {attrdict}, ...}
230-
n = len(edge_dict)
134+
# Task duration
135+
self.duration = self.tf - self.ti
231136

232-
# curvature values spread between -0.3 and +0.3 [helpful to visualize multiple edges]
233-
if n==1:
234-
rads = [0]
235-
offsets = [0.5]
236-
else:
237-
rads = np.linspace(-0.3, 0.3, n)
238-
offsets = np.linspace(0.2, 0.8, n)
239-
240-
# draw each edge
241-
durations = [d.get("duration", 0.0) for d in edge_dict.values()]
242-
scale = max(max(durations), 0.0001) # avoid div by zero
243-
width_scale = 4.0 / scale # normalize largest to ~4px
137+
def calcCost(self):
138+
'''Calculates the total cost of the task based on the costs of individual actions.
139+
Updates self.cost accordingly. This method assumes that action.cost has
140+
already been evaluated for each action in self.actions.
141+
'''
142+
total_cost = 0.0
143+
for action in self.actions.values():
144+
total_cost += action.cost
145+
self.cost = total_cost
146+
return self.cost
244147

245-
for rad, offset, (k, d) in zip(rads, offsets, edge_dict.items()):
246-
nx.draw_networkx_edges(
247-
H, pos, edgelist=[(u, v)], ax=ax,
248-
connectionstyle=f"arc3,rad={rad}",
249-
arrows=True, arrowstyle="-|>",
250-
edge_color="gray",
251-
width=max(0.5, d.get("duration", []) * width_scale),
252-
)
253-
label_positions[(u, v, k)] = offset # store position for edge label
148+
def updateTaskTime(self, newStart=0.0):
149+
'''Update the start time of all actions based on a new task start time.
254150
255-
ax.set_title(title_str, fontsize=12, fontweight="bold")
256-
ax.axis("off")
257-
plt.tight_layout()
151+
Parameters
152+
----------
153+
newStart : float
154+
The new start time for the task. All action start times will be adjusted accordingly.
155+
'''
156+
# Calculate the time shift
157+
time_shift = newStart - self.ti
258158

259-
return H
260-
261-
159+
# Update task start and finish times
160+
self.ti = newStart
161+
self.tf += time_shift
262162

263-
def getTaskGraph(self, plot=True):
264-
'''Generate a graph of the action dependencies.
265-
'''
266-
267-
# Create the graph
268-
G = nx.DiGraph()
269-
for item, data in self.actions.items():
270-
for dep in data.dependencies:
271-
G.add_edge(dep, item, duration=data.duration) # Store duration as edge attribute
163+
# Update action start times
164+
for action in self.actions_ti:
165+
self.actions_ti[action] += time_shift
166+
272167

273-
# Compute longest path & total duration
274-
longest_path = nx.dag_longest_path(G, weight='duration')
275-
longest_path_edges = list(zip(longest_path, longest_path[1:])) # Convert path into edge pairs
276168

277-
total_duration = sum(self.actions[node].duration for node in longest_path)
278-
return G
279169

280170

281171
def get_row(self, assets):

0 commit comments

Comments
 (0)