Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 131 additions & 102 deletions crates/bevy_asset/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,15 +642,20 @@ impl AssetProcessor {
async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) {
let asset_path = AssetPath::from(path).with_source(source.id());
debug!("Removing processed {asset_path} because source was removed");
let mut infos = self.data.processing_state.asset_infos.write().await;
if let Some(info) = infos.get(&asset_path) {
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
// can finish their operations
let _write_lock = info.file_transaction_lock.write();
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}
infos.remove(&asset_path).await;
let lock = {
// Scope the infos lock so we don't hold up other processing for too long.
let mut infos = self.data.processing_state.asset_infos.write().await;
infos.remove(&asset_path).await
};
let Some(lock) = lock else {
return;
};

// we must wait for uncontested write access to the asset source to ensure existing
// readers/writers can finish their operations
let _write_lock = lock.write();
self.remove_processed_asset_and_meta(source, asset_path.path())
.await;
}

/// Handles a renamed source asset by moving its processed results to the new location and updating in-memory paths + metadata.
Expand All @@ -662,24 +667,29 @@ impl AssetProcessor {
new: PathBuf,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
let mut infos = self.data.processing_state.asset_infos.write().await;
let old = AssetPath::from(old).with_source(source.id());
let new = AssetPath::from(new).with_source(source.id());
let processed_writer = source.processed_writer().unwrap();
if let Some(info) = infos.get(&old) {
// we must wait for uncontested write access to the asset source to ensure existing readers / writers
// can finish their operations
let _write_lock = info.file_transaction_lock.write();
processed_writer
.rename(old.path(), new.path())
.await
.unwrap();
processed_writer
.rename_meta(old.path(), new.path())
.await
.unwrap();
}
infos.rename(&old, &new, new_task_sender).await;
let result = {
// Scope the infos lock so we don't hold up other processing for too long.
let mut infos = self.data.processing_state.asset_infos.write().await;
infos.rename(&old, &new, new_task_sender).await
};
let Some((old_lock, new_lock)) = result else {
return;
};
// we must wait for uncontested write access to both assets to ensure existing
// readers/writers can finish their operations
let _old_write_lock = old_lock.write();
let _new_write_lock = new_lock.write();
processed_writer
.rename(old.path(), new.path())
.await
.unwrap();
processed_writer
.rename_meta(old.path(), new.path())
.await
.unwrap();
}

async fn queue_processing_tasks_for_folder(
Expand Down Expand Up @@ -1069,9 +1079,15 @@ impl AssetProcessor {
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
// See ProcessedAssetInfo::file_transaction_lock docs for more info
let _transaction_lock = {
let mut infos = self.data.processing_state.asset_infos.write().await;
let info = infos.get_or_insert(asset_path.clone());
info.file_transaction_lock.write_arc().await
let lock = {
let mut infos = self.data.processing_state.asset_infos.write().await;
let info = infos.get_or_insert(asset_path.clone());
// Clone out the transaction lock first and then lock after we've dropped the
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
// (leading to deadlocks).
info.file_transaction_lock.clone()
};
lock.write_arc().await
};

// NOTE: if processing the asset fails this will produce an "unfinished" log entry, forcing a rebuild on next run.
Expand Down Expand Up @@ -1317,11 +1333,17 @@ impl ProcessingState {
&self,
path: &AssetPath<'static>,
) -> Result<RwLockReadGuardArc<()>, AssetReaderError> {
let infos = self.asset_infos.read().await;
let info = infos
.get(path)
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
Ok(info.file_transaction_lock.read_arc().await)
let lock = {
let infos = self.asset_infos.read().await;
let info = infos
.get(path)
.ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?;
// Clone out the transaction lock first and then lock after we've dropped the
// asset_infos. Otherwise, trying to lock a single path can block all other paths to
// (leading to deadlocks).
info.file_transaction_lock.clone()
};
Ok(lock.read_arc().await)
}

/// Returns a future that will not finish until the path has been processed.
Expand Down Expand Up @@ -1603,95 +1625,102 @@ impl ProcessorAssetInfos {
}
}

/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
async fn remove(&mut self, asset_path: &AssetPath<'static>) {
let info = self.infos.remove(asset_path);
if let Some(info) = info {
if let Some(processed_info) = info.processed_info {
self.clear_dependencies(asset_path, processed_info);
}
// Tell all listeners this asset does not exist
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
if !info.dependents.is_empty() {
error!(
/// Remove the info for the given path. This should only happen if an asset's source is
/// removed/non-existent. Returns the transaction lock for the asset, or [`None`] if the asset
/// path does not exist.
async fn remove(
&mut self,
asset_path: &AssetPath<'static>,
) -> Option<Arc<async_lock::RwLock<()>>> {
let info = self.infos.remove(asset_path)?;
if let Some(processed_info) = info.processed_info {
self.clear_dependencies(asset_path, processed_info);
}
// Tell all listeners this asset does not exist
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
if !info.dependents.is_empty() {
error!(
"The asset at {asset_path} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(asset_path.clone(), info.dependents);
}
self.non_existent_dependents
.insert(asset_path.clone(), info.dependents);
}

Some(info.file_transaction_lock)
}

/// Remove the info for the given path. This should only happen if an asset's source is removed / non-existent
/// Remove the info for the old path, and move over its info to the new path. This should only
/// happen if an asset's source is removed/non-existent. Returns the transaction locks for the
/// old and new assets respectively, or [`None`] if the old path does not exist.
async fn rename(
&mut self,
old: &AssetPath<'static>,
new: &AssetPath<'static>,
new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>,
) {
let info = self.infos.remove(old);
if let Some(mut info) = info {
if !info.dependents.is_empty() {
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
// (see the remove impl).
error!(
) -> Option<(Arc<async_lock::RwLock<()>>, Arc<async_lock::RwLock<()>>)> {
let mut info = self.infos.remove(old)?;
if !info.dependents.is_empty() {
// TODO: We can't currently ensure "moved" folders with relative paths aren't broken because AssetPath
// doesn't distinguish between absolute and relative paths. We have "erased" relativeness. In the short term,
// we could do "remove everything in a folder and re-add", but that requires full rebuilds / destroying the cache.
// If processors / loaders could enumerate dependencies, we could check if the new deps line up with a rename.
// If deps encoded "relativeness" as part of loading, that would also work (this seems like the right call).
// TODO: it would be nice to log an error here for dependents that aren't also being moved + fixed.
// (see the remove impl).
error!(
"The asset at {old} was removed, but it had assets that depend on it to be processed. Consider updating the path in the following assets: {:?}",
info.dependents
);
self.non_existent_dependents
.insert(old.clone(), core::mem::take(&mut info.dependents));
}
if let Some(processed_info) = &info.processed_info {
// Update "dependent" lists for this asset's "process dependencies" to use new path.
for dep in &processed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&dep.path) {
info.dependents.remove(old);
info.dependents.insert(new.clone());
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path)
{
dependents.remove(old);
dependents.insert(new.clone());
}
self.non_existent_dependents
.insert(old.clone(), core::mem::take(&mut info.dependents));
}
if let Some(processed_info) = &info.processed_info {
// Update "dependent" lists for this asset's "process dependencies" to use new path.
for dep in &processed_info.process_dependencies {
if let Some(info) = self.infos.get_mut(&dep.path) {
info.dependents.remove(old);
info.dependents.insert(new.clone());
} else if let Some(dependents) = self.non_existent_dependents.get_mut(&dep.path) {
dependents.remove(old);
dependents.insert(new.clone());
}
}
// Tell all listeners this asset no longer exists
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
let dependents: Vec<AssetPath<'static>> = {
let new_info = self.get_or_insert(new.clone());
new_info.processed_info = info.processed_info;
new_info.status = info.status;
// Ensure things waiting on the new path are informed of the status of this asset
if let Some(status) = new_info.status {
new_info.status_sender.broadcast(status).await.unwrap();
}
new_info.dependents.iter().cloned().collect()
};
// Queue the asset for a reprocess check, in case it needs new meta.
}
// Tell all listeners this asset no longer exists
info.status_sender
.broadcast(ProcessStatus::NonExistent)
.await
.unwrap();
let new_info = self.get_or_insert(new.clone());
new_info.processed_info = info.processed_info;
new_info.status = info.status;
// Ensure things waiting on the new path are informed of the status of this asset
if let Some(status) = new_info.status {
new_info.status_sender.broadcast(status).await.unwrap();
}
let dependents = new_info.dependents.iter().cloned().collect::<Vec<_>>();
// Queue the asset for a reprocess check, in case it needs new meta.
let _ = new_task_sender
.send((new.source().clone_owned(), new.path().to_owned()))
.await;
for dependent in dependents {
// Queue dependents for reprocessing because they might have been waiting for this asset.
let _ = new_task_sender
.send((new.source().clone_owned(), new.path().to_owned()))
.send((
dependent.source().clone_owned(),
dependent.path().to_owned(),
))
.await;
for dependent in dependents {
// Queue dependents for reprocessing because they might have been waiting for this asset.
let _ = new_task_sender
.send((
dependent.source().clone_owned(),
dependent.path().to_owned(),
))
.await;
}
}

Some((
info.file_transaction_lock,
new_info.file_transaction_lock.clone(),
))
}

fn clear_dependencies(&mut self, asset_path: &AssetPath<'static>, removed_info: ProcessedInfo) {
Expand Down