Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// create header
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p){ return p.name.starts_with("aod-origin-replaced"); });
bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p) { return p.name.starts_with("aod-origin-replaced"); });

if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) {
if (first) {
Expand Down
15 changes: 9 additions & 6 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -1744,12 +1744,13 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase<C, framework:
template <soa::is_table T>
auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){
auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) {
if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) {
m.origin = o;
}
return m;
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)), node.name});
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)),
node.name});
auto [offset, count] = localCache.getSliceFor(value);
auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count)}, static_cast<uint64_t>(offset));
if (t.tableSize() != 0) {
Expand All @@ -1761,12 +1762,13 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const&
template <soa::is_filtered_table T>
auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){
auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) {
if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) {
m.origin = o;
}
return m;
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)), node.name});
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)),
node.name});
auto [offset, count] = localCache.getSliceFor(value);
auto slice = table->asArrowTable()->Slice(static_cast<uint64_t>(offset), count);
return prepareFilteredSlice(table, slice, offset);
Expand All @@ -1775,12 +1777,13 @@ auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode
template <soa::is_table T>
auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache)
{
auto localCache = cache.ptr->getCacheUnsortedFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){
auto localCache = cache.ptr->getCacheUnsortedFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) {
if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) {
m.origin = o;
}
return m;
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)), node.name});
}(o2::soa::getMatcherFromTypeForKey<T>(node.name)),
node.name});
if constexpr (soa::is_filtered_table<T>) {
auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value));
if (t.tableSize() != 0) {
Expand Down
23 changes: 10 additions & 13 deletions Framework/Core/include/Framework/AnalysisHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri
sources = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata, o2::aod::Hash<R.origin_hash>>();
}
if ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) {
std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as<std::string>()](framework::ConfigParamSpec& source){
std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as<std::string>()](framework::ConfigParamSpec& source) {
return replaceOrigin(source, originStr);
});
metadata.push_back(framework::ConfigParamSpec{"aod-origin-replaced", framework::VariantType::Bool, true, {"\"\""}});
Expand All @@ -432,12 +432,12 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri
}

return framework::InputSpec{
o2::aod::label<R>(),
((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin<R>(),
o2::aod::description(o2::aod::signature<R>()),
R.version,
framework::Lifetime::Timeframe,
metadata};
o2::aod::label<R>(),
((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin<R>(),
o2::aod::description(o2::aod::signature<R>()),
R.version,
framework::Lifetime::Timeframe,
metadata};
}

template <TableRef R>
Expand Down Expand Up @@ -632,11 +632,10 @@ struct TableTransform {
std::vector<InputSpec> requiredInputs = getRequiredInputs();
static std::vector<InputSpec> getRequiredInputs(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
{
return [&newOrigin]<size_t... Is>(std::index_sequence<Is...>){
return std::vector{soa::tableRef2InputSpec<sources[Is]>(newOrigin)...};
}(std::make_index_sequence<sources.size()>());
return [&newOrigin]<size_t... Is>(std::index_sequence<Is...>) {
return std::vector{soa::tableRef2InputSpec<sources[Is]>(newOrigin)...};
}(std::make_index_sequence<sources.size()>());
}

};

/// This helper struct allows you to declare extended tables which should be
Expand All @@ -654,7 +653,6 @@ constexpr auto transformBase()
return TableTransform<metadata, metadata::template extension_table_t_from<o2::aod::Hash<T::originals[T::originals.size() - 1].origin_hash>>::ref>{};
}


/// In a multi-origin case the origin is provided by the type
/// FIXME: In a rewritten origin case the output designation needs to be changed (through base class)
/// The extraction of the elements needs to be changed in AnalysisManagers using the origin information from the base class
Expand Down Expand Up @@ -834,7 +832,6 @@ concept is_builds = requires(T t) {
requires std::same_as<decltype(t.map), std::vector<soa::IndexRecord>>;
};


/// a task with rewritten origin, if running together with a task with the default, will
/// have a different name and thus its output would be routed separately

Expand Down
12 changes: 6 additions & 6 deletions Framework/Core/include/Framework/AnalysisManagers.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrig
template <is_produces_group T>
bool updateOutputSpec(T& producesGroup, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
{
homogeneous_apply_refs<true>([&newOrigin](auto& produces){ return updateOutputSpec(produces, newOrigin); }, producesGroup);
homogeneous_apply_refs<true>([&newOrigin](auto& produces) { return updateOutputSpec(produces, newOrigin); }, producesGroup);
}

template <typename C>
Expand Down Expand Up @@ -312,7 +312,7 @@ template <is_spawns T>
bool prepareOutput(ProcessingContext& context, T& spawns)
{
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::originals[T::spawnable_t::originals.size() - 1].desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) );
auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
if (originalTable->num_rows() == 0) {
originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{});
}
Expand All @@ -330,15 +330,15 @@ bool prepareOutput(ProcessingContext& context, T& spawns)
template <is_builds T>
bool prepareOutput(ProcessingContext& context, T& builds)
{
return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ));
return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
}

template <is_defines T>
bool prepareOutput(ProcessingContext& context, T& defines)
requires(T::delayed == false)
{
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::originals[T::spawnable_t::originals.size() - 1].desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) );
auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
if (originalTable->num_rows() == 0) {
originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{});
}
Expand Down Expand Up @@ -370,7 +370,7 @@ bool prepareDelayedOutput(ProcessingContext& context, T& defines)
defines.recompile();
}
using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) );
auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
if (originalTable->num_rows() == 0) {
originalTable = makeEmptyTable<metadata::base_table_t::ref>();
}
Expand Down Expand Up @@ -622,7 +622,7 @@ bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin = header::Da
template <is_preslice_group T>
bool replaceOrigin(T& presliceGroup, header::DataOrigin const& newOrigin)
{
homogeneous_apply_refs<true>([&newOrigin](auto& preslice){ return replaceOrigin(preslice, newOrigin); }, presliceGroup);
homogeneous_apply_refs<true>([&newOrigin](auto& preslice) { return replaceOrigin(preslice, newOrigin); }, presliceGroup);
return true;
}

Expand Down
11 changes: 5 additions & 6 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,7 @@ struct AnalysisDataProcessorBuilder {
template <soa::is_table_or_iterator T, int AI>
static auto extract(InputRecord& record, std::vector<InputInfo> iInfos, std::vector<ExpressionInfo>& infos, size_t phash)
{
auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers
| std::views::filter([](auto const& pair) { return pair.first == AI; });
auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers | std::views::filter([](auto const& pair) { return pair.first == AI; });
if constexpr (soa::is_filtered<T>) {
return extractFilteredFromRecord<T>(record, matchers, *std::ranges::find_if(infos, [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
} else {
Expand Down Expand Up @@ -562,7 +561,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
}

// update OutputSpecs in output declarations
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element){ return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get());
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element) { return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get());

// append outputs
homogeneous_apply_refs_sized<numElements>([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
Expand All @@ -574,7 +573,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
homogeneous_apply_refs_sized<numElements>([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get());

// replace origins in Preslice declarations
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element){ return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get());
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get());

auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
Cache bindingsKeys;
Expand Down Expand Up @@ -623,13 +622,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
*task.get());

/// replace origin in slicing caches
std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry){
std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
}
return entry;
});
std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry){
std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
}
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/AnalysisDataModelHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ std::string getTreeName(header::DataHeader dh, bool wasAOD)
}

// add prefix according to origin
if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin){ return o == origin; })) {
treeName = "O2" + treeName;
if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin) { return o == origin; })) {
treeName = "O2" + treeName;
}

// exceptions from this
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
// FIXME: until we have a single list of pairs
additionalInputs |
std::ranges::views::filter([](InputSpec const& input){
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){
std::ranges::views::filter([](InputSpec const& input) {
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p) {
return p.name.starts_with("aod-origin-replaced");
});
}) |
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// update currently requested AODs
for (auto& d : workflow) {
d.inputs |
std::ranges::views::filter([](InputSpec const& input){
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){
std::ranges::views::filter([](InputSpec const& input) {
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p) {
return p.name.starts_with("aod-origin-replaced");
});
}) |
Expand Down