Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 69 additions & 55 deletions parla/task_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ def dependees(self) -> Tuple["Task"]:
"""
return tuple(self._dependees)

@property
def state(self):
return self._state

@property
def result(self):
if isinstance(self._state, TaskCompleted):
Expand Down Expand Up @@ -613,15 +617,8 @@ def run(self):
for d in self.req.devices:
for resource, amount in self.req.resources.items():
logger.debug("Task %r deallocating %d %s from device %r", self, amount, resource, d)
ctx.scheduler._available_resources.deallocate_resources(d, self.req.resources)
ctx.scheduler.update_mapped_task_count(self, d, -1)
ctx.scheduler.update_launched_task_count_mutex(self, d, -1)

# Update OUT parrays which may have changed size from 0 to something
# We assume all IN and INOUT params don't change size
for parray in (self.dataflow.output):
ctx.scheduler._available_resources.update_parray_nbytes(parray, self.req.devices)

ctx.scheduler.decr_active_compute_tasks()

# Protect the case that it notifies dependees and
Expand Down Expand Up @@ -1126,48 +1123,45 @@ def check_resources_availability(self, d: Device, resources: ResourceDict):
:param resources: The resources to deallocate.
"""
logger.debug("[ResourcePool] Acquiring monitor in check_resources_availability()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this line too if deleting the monitor

with self._monitor:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure removing this monitor is safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it is if only the scheduling thread makes this call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked it by printing a thread and it was called only by the scheduler. But let me double check this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can also be called by any thread that creates a PArray:

get_scheduler_context().scheduler._available_resources.track_parray(self)

is_available = True
for name, amount in resources.items():
dres = self._devices[d]

is_available = True
for name, amount in resources.items():
dres = self._devices[d]
# Workaround stupid vcus (I'm getting rid of these at some point)
#if d.architecture.id == 'gpu' and name == 'vcus':
# continue

# Workaround stupid vcus (I'm getting rid of these at some point)
#if d.architecture.id == 'gpu' and name == 'vcus':
# continue

if amount > dres[name]:
is_available = False
logger.debug("Resource check for %d %s on device %r: %s", amount, name, d, "Passed" if is_available else "Failed")
logger.debug("[ResourcePool] Releasing monitor in check_resources_availability()")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this too if removing the monitor

return is_available
if amount > dres[name]:
is_available = False
logger.debug("Resource check for %d %s on device %r: %s", amount, name, d, "Passed" if is_available else "Failed")
logger.debug("[ResourcePool] Releasing monitor in check_resources_availability()")
return is_available

def _atomically_update_resources(self, d: Device, resources: ResourceDict, multiplier, block: bool):
logger.debug("[ResourcePool] Acquiring monitor in atomically_update_resources()")
with self._monitor:
to_release = []
success = True
for name, v in resources.items():
if not self._update_resource(d, name, v * multiplier, block):
success = False
break
else:
to_release.append((name, v))
to_release = []
success = True
for name, v in resources.items():
if not self._update_resource(d, name, v * multiplier, block):
success = False
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function no longer uses the monitor, it should not be named "atomically"

else:
to_release.clear()
to_release.append((name, v))
else:
to_release.clear()

logger.info("[ResourcePool] Attempted to allocate %s * %r (blocking %s) => %s", \
multiplier, (d, resources), block, "success" if success else "fail")
if to_release:
logger.info("[ResourcePool] Releasing resources due to failure: %r", to_release)
logger.info("[ResourcePool] Attempted to allocate %s * %r (blocking %s) => %s", \
multiplier, (d, resources), block, "success" if success else "fail")
if to_release:
logger.info("[ResourcePool] Releasing resources due to failure: %r", to_release)

for name, v in to_release:
ret = self._update_resource(d, name, -v * multiplier, block)
assert ret
for name, v in to_release:
ret = self._update_resource(d, name, -v * multiplier, block)
assert ret

assert not success or len(to_release) == 0 # success implies to_release empty
logger.debug("[ResourcePool] Releasing monitor in atomically_update_resources()")
return success
assert not success or len(to_release) == 0 # success implies to_release empty
logger.debug("[ResourcePool] Releasing monitor in atomically_update_resources()")
return success

def _update_resource(self, dev: Device, res: str, amount: float, block: bool):
# Workaround stupid vcus (I'm getting rid of these at some point)
Expand All @@ -1179,18 +1173,14 @@ def _update_resource(self, dev: Device, res: str, amount: float, block: bool):
dres = self._devices[dev]
if -amount <= dres[res]:
dres[res] += amount
if amount > 0:
self._monitor.notify_all()
assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res)
assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res)
# TODO(lhc): Due to floating point, it doesn't work.
#assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res)
#assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gross. Can we make these integers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one possible way is use the default vcu sum as the number of threads or something bigger numbers, not 1.
@dialecticDolt is it fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also gross. We might add the resources back up and exceed the original amount bc of rounding errors and the order that tasks complete matters too.
Probably using: https://docs.python.org/3/library/fractions.html ? might be the cleanest way to handle this.

return True
else:
logger.info("If you're seeing this message, you probably have an issue.")
logger.info("The current mapper should never try to allocate resources that aren't actually available")
if block:
self._monitor.wait()
else:
return False
return False
except KeyError:
raise ValueError("Resource {}.{} does not exist".format(dev, res))

Expand Down Expand Up @@ -1394,9 +1384,12 @@ def __init__(self, environments: Collection[TaskEnvironment], n_threads: int = N
self._device_launched_compute_task_counts = {dev: 0 for dev in self._available_resources.get_resources()}
self._device_launched_datamove_task_counts = {dev: 0 for dev in self._available_resources.get_resources()}

self._launched_tasks = []

# Dictinary mapping data block to task lists.
self._datablock_dict = defaultdict(list)

print("Threads:", n_threads)
self._worker_threads = [WorkerThread(self, i) for i in range(n_threads)]
for t in self._worker_threads:
t.start()
Expand Down Expand Up @@ -1804,6 +1797,19 @@ def _map_tasks(self):
task: Optional[Task] = self._dequeue_spawned_task()
if task:
if not task.assigned:
_new_launched_task = []
for n_task in self._launched_tasks:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looping over all launched tasks for every mapping decision seems really expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this should be also refined. but the point of this PR is that we should update resource at launching, not mapping.

task_state = n_task.state
if isinstance(task_state, TaskCompleted):
for dev in n_task.req.devices:
self._available_resources.deallocate_resources(dev, n_task.req.resources)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this doesn't handle Data Movement Tasks? Their resources are still bundled with their "parents", but their resources are no longer allocated 'ahead' of time. Only compute tasks are being tracked and deallocated. This means that they currently are considered to take 'no resources' and can be over-scheduled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, maybe not. Seem like in this Data Movement tasks are still allocated at mapping time and freed when the "parent" completes. Sorry.

Copy link
Contributor

@wlruys wlruys May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although (sorry for the stream of consciousness comments on this), doesn't this mean it can still deadlock in the same way as the current mapper? Since it can allocate a bunch of data movement tasks out of order (as there is no guaranteed order to the mapping phase) and prevent the next needed dependency from being mapped (and running) due to being out of resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. still deadlock happens. Ok, let's consider this PR as the immediate solution for the performance. I think your pointing out could not be resolved by this PR. Let's consider that issue on the separate PR with more discussion.

# Update OUT parrays which may have changed size from 0 to something
# We assume all IN and INOUT params don't change size
for parray in (n_task.dataflow.output):
self._available_resources.update_parray_nbytes(parray, n_task.req.devices)
else:
_new_launched_task.append(n_task)
self._launched_tasks = _new_launched_task
is_assigned = self._assignment_policy(task) # This is what actually maps the task
#is_assigned = self._random_assignment_policy(task) # USE THIS INSTEAD TO TEST RANDOM
assert isinstance(is_assigned, bool)
Expand All @@ -1820,24 +1826,18 @@ def _map_tasks(self):
self._construct_datamove_task(data, task, OperandType.OUT)
for data in task.dataflow.inout:
self._construct_datamove_task(data, task, OperandType.INOUT)

# Update parray tracking and task count on the device
for parray in (task.dataflow.input + task.dataflow.inout + task.dataflow.output):
if len(task.req.environment.placement) > 1:
raise NotImplementedError("Multidevice not supported")
for device in task.req.environment.placement:
self._available_resources.register_parray_move(parray, device)

# TODO(lhc): the current Parla does not support multiple devices.
# leave it as a loop for the future.
for device in task.req.environment.placement:
self.update_mapped_task_count(task, device, 1)

# Allocate additional resources used by this task (blocking)
for device in task.req.devices:
for resource, amount in task.req.resources.items():
logger.debug("Task %r allocating %d %s on device %r", task, amount, resource, device)
self._available_resources.allocate_resources(device, task.req.resources, blocking=True)

# Only computation needs to set a assigned flag.
# Data movement task is set as assigned when it is created.
task.set_assigned()
Expand Down Expand Up @@ -1885,9 +1885,23 @@ def _schedule_tasks(self):
def _launch_task(self, queue, dev: Device):
try:
task = queue.pop()
# Allocate additional resources used by this task (blocking)
for device in task.req.devices:
for resource, amount in task.req.resources.items():
logger.debug("Task %r allocating %d %s on device %r", task, amount, resource, device)
self._available_resources.allocate_resources(device, task.req.resources, blocking=True)
# Update parray tracking and task count on the device
for parray in (task.dataflow.input + task.dataflow.inout + task.dataflow.output):
if len(task.req.environment.placement) > 1:
raise NotImplementedError("Multidevice not supported")
for device in task.req.environment.placement:
self._available_resources.register_parray_move(parray, device)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong. The data movement tasks already exist and are already run before this code executes. This is only updating the information once the "parent" task is starting to be run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait it also happens at creation, I'm not sure then. Why is it registered twice onto the same device?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be a mistake. Then is it fine just to remove this registeration at here?


worker = self._free_worker_threads.pop() # grab a worker
logger.info(f"[Scheduler] Launching %s task, %r on %r",
dev.architecture.id, task, worker)
if isinstance(task, ComputeTask):
self._launched_tasks.append(task)
worker.assign_task(task)
logger.debug(f"[Scheduler] Launched %r", task)
for dev in task.req.environment.placement:
Expand Down