-
Notifications
You must be signed in to change notification settings - Fork 10
Update resource managing #125
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -342,6 +342,10 @@ def dependees(self) -> Tuple["Task"]: | |||
| """ | ||||
| return tuple(self._dependees) | ||||
|
|
||||
| @property | ||||
| def state(self): | ||||
| return self._state | ||||
|
|
||||
| @property | ||||
| def result(self): | ||||
| if isinstance(self._state, TaskCompleted): | ||||
|
|
@@ -613,15 +617,8 @@ def run(self): | |||
| for d in self.req.devices: | ||||
| for resource, amount in self.req.resources.items(): | ||||
| logger.debug("Task %r deallocating %d %s from device %r", self, amount, resource, d) | ||||
| ctx.scheduler._available_resources.deallocate_resources(d, self.req.resources) | ||||
| ctx.scheduler.update_mapped_task_count(self, d, -1) | ||||
| ctx.scheduler.update_launched_task_count_mutex(self, d, -1) | ||||
|
|
||||
| # Update OUT parrays which may have changed size from 0 to something | ||||
| # We assume all IN and INOUT params don't change size | ||||
| for parray in (self.dataflow.output): | ||||
| ctx.scheduler._available_resources.update_parray_nbytes(parray, self.req.devices) | ||||
|
|
||||
| ctx.scheduler.decr_active_compute_tasks() | ||||
|
|
||||
| # Protect the case that it notifies dependees and | ||||
|
|
@@ -1126,48 +1123,45 @@ def check_resources_availability(self, d: Device, resources: ResourceDict): | |||
| :param resources: The resources to deallocate. | ||||
| """ | ||||
| logger.debug("[ResourcePool] Acquiring monitor in check_resources_availability()") | ||||
| with self._monitor: | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we sure removing this monitor is safe?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose it is if only the scheduling thread makes this call.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked it by printing a thread and it was called only by the scheduler. But let me double check this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can also be called by any thread that creates a PArray: Line 52 in d215415
|
||||
| is_available = True | ||||
| for name, amount in resources.items(): | ||||
| dres = self._devices[d] | ||||
|
|
||||
| is_available = True | ||||
| for name, amount in resources.items(): | ||||
| dres = self._devices[d] | ||||
| # Workaround stupid vcus (I'm getting rid of these at some point) | ||||
| #if d.architecture.id == 'gpu' and name == 'vcus': | ||||
| # continue | ||||
|
|
||||
| # Workaround stupid vcus (I'm getting rid of these at some point) | ||||
| #if d.architecture.id == 'gpu' and name == 'vcus': | ||||
| # continue | ||||
|
|
||||
| if amount > dres[name]: | ||||
| is_available = False | ||||
| logger.debug("Resource check for %d %s on device %r: %s", amount, name, d, "Passed" if is_available else "Failed") | ||||
| logger.debug("[ResourcePool] Releasing monitor in check_resources_availability()") | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete this too if removing the monitor |
||||
| return is_available | ||||
| if amount > dres[name]: | ||||
| is_available = False | ||||
| logger.debug("Resource check for %d %s on device %r: %s", amount, name, d, "Passed" if is_available else "Failed") | ||||
| logger.debug("[ResourcePool] Releasing monitor in check_resources_availability()") | ||||
| return is_available | ||||
|
|
||||
| def _atomically_update_resources(self, d: Device, resources: ResourceDict, multiplier, block: bool): | ||||
| logger.debug("[ResourcePool] Acquiring monitor in atomically_update_resources()") | ||||
| with self._monitor: | ||||
| to_release = [] | ||||
| success = True | ||||
| for name, v in resources.items(): | ||||
| if not self._update_resource(d, name, v * multiplier, block): | ||||
| success = False | ||||
| break | ||||
| else: | ||||
| to_release.append((name, v)) | ||||
| to_release = [] | ||||
| success = True | ||||
| for name, v in resources.items(): | ||||
| if not self._update_resource(d, name, v * multiplier, block): | ||||
| success = False | ||||
| break | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this function no longer uses the monitor, it should not be named "atomically" |
||||
| else: | ||||
| to_release.clear() | ||||
| to_release.append((name, v)) | ||||
| else: | ||||
| to_release.clear() | ||||
|
|
||||
| logger.info("[ResourcePool] Attempted to allocate %s * %r (blocking %s) => %s", \ | ||||
| multiplier, (d, resources), block, "success" if success else "fail") | ||||
| if to_release: | ||||
| logger.info("[ResourcePool] Releasing resources due to failure: %r", to_release) | ||||
| logger.info("[ResourcePool] Attempted to allocate %s * %r (blocking %s) => %s", \ | ||||
| multiplier, (d, resources), block, "success" if success else "fail") | ||||
| if to_release: | ||||
| logger.info("[ResourcePool] Releasing resources due to failure: %r", to_release) | ||||
|
|
||||
| for name, v in to_release: | ||||
| ret = self._update_resource(d, name, -v * multiplier, block) | ||||
| assert ret | ||||
| for name, v in to_release: | ||||
| ret = self._update_resource(d, name, -v * multiplier, block) | ||||
| assert ret | ||||
|
|
||||
| assert not success or len(to_release) == 0 # success implies to_release empty | ||||
| logger.debug("[ResourcePool] Releasing monitor in atomically_update_resources()") | ||||
| return success | ||||
| assert not success or len(to_release) == 0 # success implies to_release empty | ||||
| logger.debug("[ResourcePool] Releasing monitor in atomically_update_resources()") | ||||
| return success | ||||
|
|
||||
| def _update_resource(self, dev: Device, res: str, amount: float, block: bool): | ||||
| # Workaround stupid vcus (I'm getting rid of these at some point) | ||||
|
|
@@ -1179,18 +1173,14 @@ def _update_resource(self, dev: Device, res: str, amount: float, block: bool): | |||
| dres = self._devices[dev] | ||||
| if -amount <= dres[res]: | ||||
| dres[res] += amount | ||||
| if amount > 0: | ||||
| self._monitor.notify_all() | ||||
| assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res) | ||||
| assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res) | ||||
| # TODO(lhc): Due to floating point, it doesn't work. | ||||
| #assert dres[res] <= dev.resources[res], "{}.{} was over deallocated".format(dev, res) | ||||
| #assert dres[res] >= 0, "{}.{} was over allocated".format(dev, res) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gross. Can we make these integers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one possible way is use the default vcu sum as the number of threads or something bigger numbers, not 1.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also gross. We might add the resources back up and exceed the original amount bc of rounding errors and the order that tasks complete matters too. |
||||
| return True | ||||
| else: | ||||
| logger.info("If you're seeing this message, you probably have an issue.") | ||||
| logger.info("The current mapper should never try to allocate resources that aren't actually available") | ||||
| if block: | ||||
| self._monitor.wait() | ||||
| else: | ||||
| return False | ||||
| return False | ||||
| except KeyError: | ||||
| raise ValueError("Resource {}.{} does not exist".format(dev, res)) | ||||
|
|
||||
|
|
@@ -1394,9 +1384,12 @@ def __init__(self, environments: Collection[TaskEnvironment], n_threads: int = N | |||
| self._device_launched_compute_task_counts = {dev: 0 for dev in self._available_resources.get_resources()} | ||||
| self._device_launched_datamove_task_counts = {dev: 0 for dev in self._available_resources.get_resources()} | ||||
|
|
||||
| self._launched_tasks = [] | ||||
|
|
||||
| # Dictinary mapping data block to task lists. | ||||
| self._datablock_dict = defaultdict(list) | ||||
|
|
||||
| print("Threads:", n_threads) | ||||
| self._worker_threads = [WorkerThread(self, i) for i in range(n_threads)] | ||||
| for t in self._worker_threads: | ||||
| t.start() | ||||
|
|
@@ -1804,6 +1797,19 @@ def _map_tasks(self): | |||
| task: Optional[Task] = self._dequeue_spawned_task() | ||||
| if task: | ||||
| if not task.assigned: | ||||
| _new_launched_task = [] | ||||
| for n_task in self._launched_tasks: | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looping over all launched tasks for every mapping decision seems really expensive.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, this should be also refined. but the point of this PR is that we should update resource at launching, not mapping. |
||||
| task_state = n_task.state | ||||
| if isinstance(task_state, TaskCompleted): | ||||
| for dev in n_task.req.devices: | ||||
| self._available_resources.deallocate_resources(dev, n_task.req.resources) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this doesn't handle Data Movement Tasks? Their resources are still bundled with their "parents", but their resources are no longer allocated 'ahead' of time. Only compute tasks are being tracked and deallocated. This means that they currently are considered to take 'no resources' and can be over-scheduled.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, maybe not. Seem like in this Data Movement tasks are still allocated at mapping time and freed when the "parent" completes. Sorry.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although (sorry for the stream of consciousness comments on this), doesn't this mean it can still deadlock in the same way as the current mapper? Since it can allocate a bunch of data movement tasks out of order (as there is no guaranteed order to the mapping phase) and prevent the next needed dependency from being mapped (and running) due to being out of resources?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. still deadlock happens. Ok, let's consider this PR as the immediate solution for the performance. I think your pointing out could not be resolved by this PR. Let's consider that issue on the separate PR with more discussion. |
||||
| # Update OUT parrays which may have changed size from 0 to something | ||||
| # We assume all IN and INOUT params don't change size | ||||
| for parray in (n_task.dataflow.output): | ||||
| self._available_resources.update_parray_nbytes(parray, n_task.req.devices) | ||||
| else: | ||||
| _new_launched_task.append(n_task) | ||||
| self._launched_tasks = _new_launched_task | ||||
| is_assigned = self._assignment_policy(task) # This is what actually maps the task | ||||
| #is_assigned = self._random_assignment_policy(task) # USE THIS INSTEAD TO TEST RANDOM | ||||
| assert isinstance(is_assigned, bool) | ||||
|
|
@@ -1820,24 +1826,18 @@ def _map_tasks(self): | |||
| self._construct_datamove_task(data, task, OperandType.OUT) | ||||
| for data in task.dataflow.inout: | ||||
| self._construct_datamove_task(data, task, OperandType.INOUT) | ||||
|
|
||||
| # Update parray tracking and task count on the device | ||||
| for parray in (task.dataflow.input + task.dataflow.inout + task.dataflow.output): | ||||
| if len(task.req.environment.placement) > 1: | ||||
| raise NotImplementedError("Multidevice not supported") | ||||
| for device in task.req.environment.placement: | ||||
| self._available_resources.register_parray_move(parray, device) | ||||
|
|
||||
| # TODO(lhc): the current Parla does not support multiple devices. | ||||
| # leave it as a loop for the future. | ||||
| for device in task.req.environment.placement: | ||||
| self.update_mapped_task_count(task, device, 1) | ||||
|
|
||||
| # Allocate additional resources used by this task (blocking) | ||||
| for device in task.req.devices: | ||||
| for resource, amount in task.req.resources.items(): | ||||
| logger.debug("Task %r allocating %d %s on device %r", task, amount, resource, device) | ||||
| self._available_resources.allocate_resources(device, task.req.resources, blocking=True) | ||||
|
|
||||
| # Only computation needs to set a assigned flag. | ||||
| # Data movement task is set as assigned when it is created. | ||||
| task.set_assigned() | ||||
|
|
@@ -1885,9 +1885,23 @@ def _schedule_tasks(self): | |||
| def _launch_task(self, queue, dev: Device): | ||||
| try: | ||||
| task = queue.pop() | ||||
| # Allocate additional resources used by this task (blocking) | ||||
| for device in task.req.devices: | ||||
| for resource, amount in task.req.resources.items(): | ||||
| logger.debug("Task %r allocating %d %s on device %r", task, amount, resource, device) | ||||
| self._available_resources.allocate_resources(device, task.req.resources, blocking=True) | ||||
| # Update parray tracking and task count on the device | ||||
| for parray in (task.dataflow.input + task.dataflow.inout + task.dataflow.output): | ||||
| if len(task.req.environment.placement) > 1: | ||||
| raise NotImplementedError("Multidevice not supported") | ||||
| for device in task.req.environment.placement: | ||||
| self._available_resources.register_parray_move(parray, device) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems wrong. The data movement tasks already exist and are already run before this code executes. This is only updating the information once the "parent" task is starting to be run.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait it also happens at creation, I'm not sure then. Why is it registered twice onto the same device?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be a mistake. Then is it fine just to remove this registeration at here? |
||||
|
|
||||
| worker = self._free_worker_threads.pop() # grab a worker | ||||
| logger.info(f"[Scheduler] Launching %s task, %r on %r", | ||||
| dev.architecture.id, task, worker) | ||||
| if isinstance(task, ComputeTask): | ||||
| self._launched_tasks.append(task) | ||||
| worker.assign_task(task) | ||||
| logger.debug(f"[Scheduler] Launched %r", task) | ||||
| for dev in task.req.environment.placement: | ||||
|
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this line too if deleting the monitor