Skip to content

Commit 90722db

Browse files
committed
track the former aod inputs/outpus with metadata
1 parent 61fbfde commit 90722db

11 files changed

Lines changed: 82 additions & 30 deletions

Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
240240
// create header
241241
auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
242242
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
243+
bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p){ return p.name.starts_with("aod-origin-replaced"); });
243244

244-
if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
245+
if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) {
245246
if (first) {
246247
// check if there is a next file to read
247248
fcnt += device.maxInputTimeslices;
@@ -255,7 +256,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
255256
}
256257
// get first folder of next file
257258
ntf = 0;
258-
if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) {
259+
if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) {
259260
LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as<std::string>(), fcnt, ntf);
260261
throw std::runtime_error("Processing is stopped!");
261262
}

Framework/AnalysisSupport/src/DataInputDirector.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,7 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte
900900
return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
901901
}
902902

903-
bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
903+
bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed, bool wasAOD)
904904
{
905905
std::string treename;
906906

@@ -913,7 +913,7 @@ bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh,
913913
// . filename from defaultDataInputDescriptor
914914
// . treename from DataHeader
915915
didesc = mdefaultDataInputDescriptor;
916-
treename = aod::datamodel::getTreeName(dh);
916+
treename = aod::datamodel::getTreeName(dh, wasAOD);
917917
}
918918
std::string origin = dh.dataOrigin.as<std::string>();
919919

Framework/AnalysisSupport/src/DataInputDirector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class DataInputDirector
160160
int getNumberInputDescriptors() { return mdataInputDescriptors.size(); }
161161
void createDefaultDataInputDescriptor();
162162

163-
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed);
163+
bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed, bool wasAOD);
164164
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF);
165165
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF);
166166
int getTimeFramesInFile(header::DataHeader dh, int counter);

Framework/Core/include/Framework/AnalysisDataModelHelpers.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@
1616

1717
namespace o2::aod::datamodel
1818
{
19-
std::string getTreeName(header::DataHeader dh);
19+
std::string getTreeName(header::DataHeader dh, bool wasAOD);
2020
} // namespace o2::aod::datamodel
2121
#endif // O2_FRAMEWORK_ANALYSISDATAMODELHELPERS_H_

Framework/Core/include/Framework/AnalysisHelpers.h

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri
418418
std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as<std::string>()](framework::ConfigParamSpec& source){
419419
return replaceOrigin(source, originStr);
420420
});
421+
metadata.push_back(framework::ConfigParamSpec{"aod-origin-replaced", framework::VariantType::Bool, true, {"\"\""}});
421422
}
422423
metadata.insert(metadata.end(), sources.begin(), sources.end());
423424
auto ccdbURLs = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
@@ -440,7 +441,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri
440441
}
441442

442443
template <TableRef R>
443-
constexpr auto tableRef2OutputSpec()
444+
constexpr auto tableRef2OutputSpec(header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
444445
{
445446
std::vector<framework::ConfigParamSpec> metadata;
446447
using md = typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata;
@@ -453,7 +454,7 @@ constexpr auto tableRef2OutputSpec()
453454
}
454455
return framework::OutputSpec{
455456
framework::OutputLabel{o2::aod::label<R>()},
456-
o2::aod::origin<R>(),
457+
((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin<R>(),
457458
o2::aod::description(o2::aod::signature<R>()),
458459
R.version,
459460
framework::Lifetime::Timeframe,
@@ -497,6 +498,10 @@ struct WritingCursor {
497498
using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator<T>) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }());
498499
using cursor_t = decltype(std::declval<TableBuilder>().cursor<persistent_table_t>());
499500
OutputSpec outputSpec{soa::tableRef2OutputSpec<persistent_table_t::ref>()};
501+
OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin)
502+
{
503+
outputSpec = soa::tableRef2OutputSpec<persistent_table_t::ref>(newOrigin);
504+
}
500505

501506
template <typename... Ts>
502507
void operator()(Ts&&... args)
@@ -623,10 +628,21 @@ template <soa::is_metadata M, soa::TableRef Ref>
623628
struct TableTransform {
624629
using metadata = M;
625630
constexpr static auto sources = M::template generateSources<o2::aod::Hash<Ref.origin_hash>>();
626-
std::vector<InputSpec> requiredInputs = []<size_t... Is>(std::index_sequence<Is...>){
627-
return std::vector{soa::tableRef2InputSpec<sources[Is]>()...};
628-
}(std::make_index_sequence<sources.size()>());
629-
OutputSpec outputSpec = soa::tableRef2OutputSpec<Ref>();
631+
632+
OutputSpec outputSpec = updateOutputSpec();
633+
static OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
634+
{
635+
return soa::tableRef2OutputSpec<Ref>(newOrigin);
636+
}
637+
638+
std::vector<InputSpec> requiredInputs = getRequiredInputs();
639+
static std::vector<InputSpec> getRequiredInputs(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
640+
{
641+
return [&newOrigin]<size_t... Is>(std::index_sequence<Is...>){
642+
return std::vector{soa::tableRef2InputSpec<sources[Is]>(newOrigin)...};
643+
}(std::make_index_sequence<sources.size()>());
644+
}
645+
630646
};
631647

632648
/// This helper struct allows you to declare extended tables which should be

Framework/Core/include/Framework/AnalysisManagers.h

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,19 @@ bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
134134
}
135135

136136
/// Table auto-creation handling
137+
138+
template <typename T>
139+
concept with_required_inputs = requires(T t) { t.getRequiredInputs(); };
140+
141+
template <typename T>
142+
requires(!with_required_inputs<T>)
143+
bool requestInputs(std::vector<InputSpec>&, T&, header::DataOrigin)
144+
{
145+
return false;
146+
}
147+
137148
template <typename T>
138-
bool requestInputs(std::vector<InputSpec>&, T const&)
149+
bool updateOutputSpec(T&, header::DataOrigin)
139150
{
140151
return false;
141152
}
@@ -158,19 +169,27 @@ const char* controlOption()
158169
return "control:define";
159170
}
160171

161-
template <typename T>
162-
concept with_required_inputs = requires(T t) { t.requiredInputs.size(); };
163-
164172
template <with_required_inputs T>
165-
bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
173+
bool requestInputs(std::vector<InputSpec>& inputs, T& entity, header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
166174
{
175+
entity.requiredInputs = entity.getRequiredInputs(newOrigin);
167176
for (auto base_spec : entity.requiredInputs) {
168177
base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
169178
DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
170179
}
171180
return true;
172181
}
173182

183+
template <typename T>
184+
concept with_updateable_output = requires(T t) { t.updateOutputSpec(); };
185+
186+
template <with_updateable_output T>
187+
bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
188+
{
189+
entity.outputSpec = entity.updateOutputSpec(newOrigin);
190+
return true;
191+
}
192+
174193
template <typename C>
175194
bool newDataframeCondition(InputRecord&, C&)
176195
{

Framework/Core/include/Framework/AnalysisTask.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,8 +557,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
557557

558558
// request base tables for spawnable extended tables and indices to be built
559559
// this checks for duplications
560-
homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) {
561-
return analysis_task_parsers::requestInputs(inputs, element);
560+
homogeneous_apply_refs_sized<numElements>([&inputs, &newOrigin](auto& element) {
561+
return analysis_task_parsers::requestInputs(inputs, element, newOrigin);
562562
},
563563
*task.get());
564564

@@ -567,8 +567,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
567567
LOG(warn) << "Task " << name_str << " has no inputs";
568568
}
569569

570+
// update OutputSpecs in output declarations
571+
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element){ return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get());
572+
573+
// append outputs
570574
homogeneous_apply_refs_sized<numElements>([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
571575

576+
// request services
572577
auto requiredServices = CommonServices::defaultServices();
573578
auto arrowServices = CommonServices::arrowServices();
574579
requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());

Framework/Core/src/AnalysisDataModelHelpers.cxx

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
#include "Framework/AnalysisDataModelHelpers.h"
1313
#include "Framework/AnalysisDataModel.h"
1414
#include "Framework/AnalysisSupportHelpers.h"
15-
#include "Framework/StringHelpers.h"
16-
#include "Framework/Logger.h"
1715

1816
std::string str_tolower(std::string s)
1917
{
@@ -25,7 +23,7 @@ std::string str_tolower(std::string s)
2523

2624
namespace o2::aod::datamodel
2725
{
28-
std::string getTreeName(header::DataHeader dh)
26+
std::string getTreeName(header::DataHeader dh, bool wasAOD)
2927
{
3028
auto description = std::string(dh.dataDescription.str);
3129
auto iver = (float)dh.subSpecification;
@@ -38,11 +36,8 @@ std::string getTreeName(header::DataHeader dh)
3836
}
3937

4038
// add prefix according to origin
41-
for (auto possibleOrigin : framework::AODOrigins) {
42-
if (dh.dataOrigin == possibleOrigin) {
39+
if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin){ return o == origin; })) {
4340
treeName = "O2" + treeName;
44-
break;
45-
}
4641
}
4742

4843
// exceptions from this

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,11 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector<InputSpec> c
174174
additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs
175175
// FIXME: until we have a single list of pairs
176176
additionalInputs |
177-
views::partial_match_filter(AODOrigins) |
177+
std::ranges::views::filter([](InputSpec const& input){
178+
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){
179+
return p.name.starts_with("aod-origin-replaced");
180+
});
181+
}) |
178182
std::ranges::views::filter([](InputSpec const& input) {
179183
return std::ranges::none_of(input.metadata, [](ConfigParamSpec const& p) { return (p.name.compare("projectors") == 0) || (p.name.compare("index-records") == 0); });
180184
}) |

Framework/Core/src/ArrowSupport.cxx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -711,7 +711,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
711711
// update currently requested AODs
712712
for (auto& d : workflow) {
713713
d.inputs |
714-
views::partial_match_filter(AODOrigins) |
714+
std::ranges::views::filter([](InputSpec const& input){
715+
return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){
716+
return p.name.starts_with("aod-origin-replaced");
717+
});
718+
}) |
715719
sinks::update_input_list{dec.requestedAODs};
716720
}
717721

0 commit comments

Comments
 (0)