Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/multio/action/encode-mtg2/EncodeMtg2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ std::unique_ptr<metkit::codes::CodesHandle> encode(metkit::mars2grib::Mars2Grib&
T* values, size_t size, const dm::FullMarsRecord& marsRec,
const dm::MiscRecord& miscRec) {
const auto mars = dm::dumpRecord<eckit::LocalConfiguration>(marsRec);
const auto misc = dm::dumpRecord<eckit::LocalConfiguration>(miscRec);

const auto misc = dm::dumpUnscopedRecord<eckit::LocalConfiguration>(miscRec);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is also provided by a separate hotfix:
#275

Without that change, misc keys are not passed through.

if (!cache) {
return encoder.encode(values, size, mars, misc);
}
Expand Down Expand Up @@ -88,10 +88,6 @@ void EncodeMtg2::executeImpl(Message msg) {
// Apply mappings
auto mappingResult = mars2mars::applyMappings(mars2mars::allRules(), marsRec, miscRec);

// Dump (mapped) mars and misc keys to local configurations
const auto mars = dm::dumpRecord<eckit::LocalConfiguration>(marsRec);
const auto misc = dm::dumpUnscopedRecord<eckit::LocalConfiguration>(miscRec);

executeNext(dispatchPrecisionTag(msg.precision(), [&](auto pt) {
using Precision = typename decltype(pt)::type;
msg.payload().acquire();
Expand Down Expand Up @@ -123,7 +119,6 @@ void EncodeMtg2::executeImpl(Message msg) {
eckit::Buffer buf{sample->messageSize()};
sample->copyInto(reinterpret_cast<uint8_t*>(buf.data()), buf.size());

// TODO(pgeier) write mapped metadata
return Message{Message::Header{Message::Tag::Field, Peer{msg.source()}, Peer{msg.destination()},
dm::dumpRecord<message::Metadata>(marsRec)},
std::move(buf)};
Expand Down
4 changes: 2 additions & 2 deletions src/multio/action/mask/Mask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ message::Message Mask::createMasked(message::Message msg) const {
}

message::Metadata& md = msg.modifyMetadata();
md.set("missingValue", missingValue_);
md.set("bitmapPresent", true);
md.set("misc-missingValue", missingValue_);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lines were only used by NEMO throug the IO server so far.

As the metadata has changed in iom.F90, this metadata is now consistent with the other actions.

md.set("misc-bitmapPresent", true);

return msg;
}
Expand Down
2 changes: 1 addition & 1 deletion src/multio/action/single-field-sink/SingleFieldSink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void SingleFieldSink::write(Message msg) {


std::ostringstream oss;
oss << rootPath_ << msg.metadata().get<std::int64_t>("level") << "::" << paramOrId
oss << rootPath_ << msg.metadata().get<std::int64_t>("levelist") << "::" << paramOrId
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key "level" is not used in mtg2 anymore. "levelist" is the appropriate mars key.

<< "::" << msg.metadata().get<std::int64_t>("step");
eckit::LocalConfiguration config;

Expand Down
24 changes: 14 additions & 10 deletions src/multio/action/statistics-mtg2/OperationWindow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ eckit::DateTime yyyymmdd_hhmmss2DateTime(uint64_t yyyymmdd, uint64_t hhmmss) {

OperationWindow make_window(const std::unique_ptr<PeriodUpdater>& periodUpdater, const StatisticsConfiguration& cfg) {
// Note: A subtraction eckit::DateTime - eckit::Second yields eckit::Second instead of eckit::DateTime
// We do our calculations based on a difference since an arbitrary epoch (1st of January in the year 0) as a workarounds
// We do our calculations based on a difference since an arbitrary epoch (1st of January in the year 0) as a
// workarounds
eckit::DateTime epoch{eckit::Date{0000, 01, 01}, eckit::Time{00, 00, 00}};
eckit::Second deltaCurr = cfg.curr() - epoch;
eckit::Second deltaStart = deltaCurr - eckit::Second{cfg.timespan().value_or(0) * 3600.0};
Expand All @@ -70,7 +71,8 @@ OperationWindow make_window(const std::unique_ptr<PeriodUpdater>& periodUpdater,
eckit::DateTime startPoint{periodUpdater->computeWinStartTime(epoch + deltaStart)};
eckit::DateTime creationPoint{periodUpdater->computeWinCreationTime(epoch + deltaStart)};
eckit::DateTime endPoint{periodUpdater->computeWinEndTime(startPoint)};
return OperationWindow{epochPoint, startPoint, creationPoint, endPoint, cfg.timeIncrementInSeconds(), cfg.options().windowType()};
return OperationWindow{
epochPoint, startPoint, creationPoint, endPoint, cfg.timeIncrementInSeconds(), cfg.options().windowType()};
};

OperationWindow load_window(std::shared_ptr<StatisticsIO>& IOmanager, const StatisticsOptions& opt) {
Expand Down Expand Up @@ -195,21 +197,21 @@ bool OperationWindow::isWithin(const eckit::DateTime& dt) const {
}

bool OperationWindow::gtLowerBound(const eckit::DateTime& dt, bool throw_error) const {
if (throw_error && creationPoint_ >= dt) {
if (throw_error && startPoint_ >= dt) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are important and correct.

The window ranges from startPoint to endPoint.

The creationPoint can be the same as startPoint, but is more likely just to be within the window.

For ocean, no initial condition is send, after the first message arrives with time 1h. This was not passing through here.

std::ostringstream os;
os << *this << " : " << dt << " is outside of current period : lower Bound violation" << std::endl;
throw eckit::SeriousBug(os.str(), Here());
}
return dt > creationPoint_;
return dt > startPoint_;
};

bool OperationWindow::geLowerBound(const eckit::DateTime& dt, bool throw_error) const {
if (throw_error && creationPoint_ > dt) {
if (throw_error && startPoint_ > dt) {
std::ostringstream os;
os << *this << " : " << dt << " is outside of current period : lower Bound violation" << std::endl;
throw eckit::SeriousBug(os.str(), Here());
}
return dt >= creationPoint_;
return dt >= startPoint_;
};

bool OperationWindow::leUpperBound(const eckit::DateTime& dt, bool throw_error) const {
Expand Down Expand Up @@ -463,7 +465,8 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c
outFile << "timeIncrementInSeconds_ :: " << timeIncrementInSeconds_ << std::endl;
outFile << "count_ :: " << count_ << std::endl;
outFile << "counts_.size() :: " << counts_.size() << std::endl;
outFile << "windowType_ :: " << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl;
outFile << "windowType_ :: "
<< (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl;
outFile.close();
}

Expand Down Expand Up @@ -495,7 +498,7 @@ void OperationWindow::serialize(IOBuffer& currState, const std::string& fname, c
const size_t countsSize = counts_.size();
currState[17] = static_cast<std::uint64_t>(countsSize);
for (size_t i = 0; i < countsSize; ++i) {
currState[i+18] = static_cast<std::uint64_t>(counts_[i]);
currState[i + 18] = static_cast<std::uint64_t>(counts_[i]);
}

currState.computeChecksum();
Expand All @@ -520,7 +523,7 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string&
const auto countsSize = static_cast<size_t>(currState[17]);
counts_.resize(countsSize);
for (size_t i = 0; i < countsSize; ++i) {
counts_[i] = static_cast<long>(currState[i+18]);
counts_[i] = static_cast<long>(currState[i + 18]);
}

if (opt.debugRestart()) {
Expand All @@ -535,7 +538,8 @@ void OperationWindow::deserialize(const IOBuffer& currState, const std::string&
outFile << "timeIncrementInSeconds_ :: " << timeIncrementInSeconds_ << std::endl;
outFile << "count_ :: " << count_ << std::endl;
outFile << "counts_.size() :: " << counts_.size() << std::endl;
outFile << "windowType_ :: " << (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl;
outFile << "windowType_ :: "
<< (windowType_ == WindowType::ForwardOffset ? "forward-offset" : "backward-offset") << std::endl;
outFile.close();
}

Expand Down
59 changes: 38 additions & 21 deletions src/multio/action/statistics-mtg2/Statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ Statistics::Statistics(const ComponentConfiguration& compConf) :
operationMapping_{StatisticsOperationMapping::makeStatisticsOperationMapping()},
IOmanager_{StatisticsIOFactory::instance().build(opt_.restartLib(), opt_.restartPath(), opt_.restartPrefix())} {}

std::string Statistics::generateRestartNameFromFlush(const message::Message& msg, const FlushMetadataKeys& flush) const {
std::string Statistics::generateRestartNameFromFlush(const message::Message& msg,
const FlushMetadataKeys& flush) const {

std::string folderName;

Expand Down Expand Up @@ -241,8 +242,6 @@ bool Statistics::HasRestartKey(const std::string& key) {
}




void Statistics::updateLatestDateTime(const StatisticsConfiguration& cfg) {

std::ostringstream tmp;
Expand Down Expand Up @@ -318,10 +317,10 @@ void Statistics::executeImpl(message::Message msg) {
}

// The incomming message must occur AFTER the current point in the window!
if (cfg.curr() <= ts.cwin().currPoint()) {
if (cfg.curr() < ts.cwin().currPoint()) {
Comment on lines 319 to +320
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The window may be created such that curr == currPoint. This is possible if no initial condition is send, in which case the same message initiates the window AND puts in data.

The comments and the message still needs an update.

std::ostringstream os;
os << "Current time is before or equal to the current point in the window :: " << cfg.curr() << " > "
<< ts.cwin().currPoint() << std::endl;
os << "Current time is before or equal to the current point in the window :: " << cfg.curr()
<< " <= " << ts.cwin().currPoint() << ". Message: " << msg << std::endl;
throw eckit::SeriousBug(os.str(), Here());
}

Expand All @@ -331,6 +330,7 @@ void Statistics::executeImpl(message::Message msg) {
ts.updateWindow(msg, cfg);
}


// Update data
ts.updateData(msg, cfg);

Expand Down Expand Up @@ -412,7 +412,10 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me
auto opname = (*it)->operation();
if (opname != "instant") {
if (currentLoop == 1) {
const std::int64_t timespan = ts.win().currPointInHours() - ts.win().creationPointInHours();
const std::int64_t timespan
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The probably less intuitive change - the time span computation depends whether the window is viewed from the starting point or the creation point.

For monthly output, the decision is made to use the creation point - notably to explicitly avoid having "full" months if the forecast started in the middle of a month.

For daily statistics the starting point is of interest, i.e. time 0000 of a day instead of 0100.

= ts.win().currPointInHours()
- ((ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPointInHours()
: ts.win().startPointInHours());
dm::dumpEntry(dm::TIMESPAN, dm::TIMESPAN.makeEntry(timespan), md);
paramMapping_.applyMapping(md, opname, !opt_.disableStrictMapping());
}
Expand All @@ -430,7 +433,9 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me
Here());
}
// Squash means we don't map (already done in previous loop), but extend the timespan
timespan.set(ts.win().currPointInHours() - ts.win().creationPointInHours());
timespan.set(ts.win().currPointInHours()
- ((ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPointInHours()
: ts.win().startPointInHours()));
dm::dumpEntry(dm::TIMESPAN, timespan, md);
}
else {
Expand Down Expand Up @@ -460,29 +465,41 @@ void Statistics::emitStatistics(TemporalStatistics& ts, message::Peer source, me

// For instant fields or on flushes, timespan is not set yet
if (!lengthOfWindow.isSet()) {
// The window spaws between creationPoint to endPoint
// The window spaws between startingPoint, creationPoint to endPoint.
// Prev & Current point describe the last updated data points.
// In this case we are explicitly interested in creation to current point
lengthOfWindow.set(ts.win().currPointInHours() - ts.win().creationPointInHours());
// CreationPoint describes the time which the window is created - this can lay within a window (i.e.
// mid of a day, or month) whereas the startingPoint is explicitly the start of the window that can
// then at time 0 of a day or explicitly the first day of a month etc...
if (ts.periodUpdater().timeUnit() == "month") {
// For months we emit from the creation point - if a simulation is started in the mid, not the
// whole month should be considered.
lengthOfWindow.set(ts.win().currPointInHours() - ts.win().creationPointInHours());
}
else {
// For days we emit from the starting point - if the initial condition is not send (i.e. for
// ocean), the window often starts at hour 1 instead of 0 In this case we explicitly want it to
// start at 0 although first data arrived at hour 1
lengthOfWindow.set(ts.win().currPointInHours() - ts.win().startPointInHours());
}
}

dm::dumpEntry(dm::STEP, dm::STEP.makeEntry(lengthOfWindow.get().toHours()), md);
// We explicitly take the creation point - alternative would be the start point.
// The start point may be different for the first window, i.e. if the simulation starts in the mid of a month.
// To not confuse the output, we explicitly just output the window for which data has been received.
// As discussed with DGOV and scientist, half months are typically not of interest and should be ignored.
// Some additional mechanism has to make sure that these do not occur in the output (i.e. additional action).
auto dt = ts.win().creationPoint();
// We explicitly take the creation point for months and the start point for days (read comment above).
// The start point may be different for the first window, i.e. if the simulation starts in the mid of a
// month. To not confuse the output, we explicitly just output the window for which data has been
// received. As discussed with DGOV and scientist, half months are typically not of interest and should
// be ignored. Some additional mechanism has to make sure that these do not occur in the output (i.e.
// additional action).
auto dt = (ts.periodUpdater().timeUnit() == "month") ? ts.win().creationPoint() : ts.win().startPoint();

dm::dumpEntry(dm::DATE, dm::DATE.makeEntry(dt.date().yyyymmdd()), md);
dm::dumpEntry(dm::TIME, dm::TIME.makeEntry(dt.time().hhmmss()), md); // Official MARS time is in hhmm, in multio hhmmss is used
dm::dumpEntry(dm::TIME, dm::TIME.makeEntry(dt.time().hhmmss()),
md); // Official MARS time is in hhmm, in multio hhmmss is used
break;
}
}

for (const auto& kv : opt_.setMetadata()) {
md.set(kv.first, kv.second);
}
md.updateOverwrite(opt_.setMetadata());

(*it)->compute(payload, cfg);
executeNext(
Expand Down
4 changes: 4 additions & 0 deletions src/multio/action/statistics-mtg2/TemporalStatistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ TemporalStatistics::TemporalStatistics(std::shared_ptr<StatisticsIO>& IOmanager,
LOG_DEBUG_LIB(LibMultio) << opt.logPrefix() << " *** Load restart files" << std::endl;
}

const PeriodUpdater& TemporalStatistics::periodUpdater() const {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow accessing the period updater to get information on the window size categorization.

return *periodUpdater_.get();
}


void TemporalStatistics::dump(std::shared_ptr<StatisticsIO>& IOmanager, const StatisticsOptions& opt) const {
LOG_DEBUG_LIB(LibMultio) << opt.logPrefix() << " *** Dump restart files" << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions src/multio/action/statistics-mtg2/TemporalStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TemporalStatistics {
message::Metadata& metadata();

void print(std::ostream& os) const;

const PeriodUpdater& periodUpdater() const;

private:
std::unique_ptr<PeriodUpdater> periodUpdater_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ OutputTimeReference readOutputTimeReference(const FieldMetadataKeys& md, const S
if (!stream) {
// Look for stream in options
const auto& omd = opt.setMetadata();
auto it = std::find_if(omd.begin(), omd.end(), [](const auto& pair) { return pair.first == "stream"; });

if (it != omd.end()) {
stream = it->second;
if (auto it = omd.find("stream"); it != omd.end()) {
stream = it->second.get<std::string>();
}
}

Expand Down
13 changes: 3 additions & 10 deletions src/multio/action/statistics-mtg2/cfg/StatisticsOptions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,11 @@ std::optional<OutputTimeReference> parseOutputTimeRef(const eckit::LocalConfigur
throw eckit::UserError(os.str(), Here());
}

std::vector<std::pair<std::string, std::string>> parseSetMetadata(const eckit::LocalConfiguration& cfg) {
message::Metadata parseSetMetadata(const eckit::LocalConfiguration& cfg) {
if (!cfg.has("set-metadata")) {
return {};
}

auto subCfg = cfg.getSubConfiguration("set-metadata");
std::vector<std::pair<std::string, std::string>> res;
for (auto key : subCfg.keys()) {
auto value = subCfg.getString(key);
res.emplace_back(std::pair<std::string, std::string>(key, value));
}
return res;
return message::toMetadata(cfg.getSubConfiguration("set-metadata"));
}


Expand Down Expand Up @@ -214,7 +207,7 @@ bool StatisticsOptions::disableStrictMapping() const {
bool StatisticsOptions::disableSquashing() const {
return disableSquashing_;
}
const std::vector<std::pair<std::string, std::string>>& StatisticsOptions::setMetadata() const {
const message::Metadata& StatisticsOptions::setMetadata() const {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing from vector<pair<string,string>> to a proper Metadata was necessary to maintain the underlying type, especially for the key "misc-timeIncrementInSeconds".
The set-metadata is currently used to "fix" the "misc-timeIncrementInSeconds" key after the monthly statistics for ocean (which is not using stattype....).

return setMetadata_;
}

Expand Down
6 changes: 4 additions & 2 deletions src/multio/action/statistics-mtg2/cfg/StatisticsOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include "eckit/config/LocalConfiguration.h"

#include "multio/message/Metadata.h"


namespace multio::action::statistics_mtg2 {

Expand Down Expand Up @@ -45,7 +47,7 @@ class StatisticsOptions {

const bool disableStrictMapping_;
const bool disableSquashing_;
const std::vector<std::pair<std::string, std::string>> setMetadata_;
const message::Metadata setMetadata_;

const std::optional<OutputTimeReference> outputTimeReference_;

Expand All @@ -69,7 +71,7 @@ class StatisticsOptions {

bool disableStrictMapping() const;
bool disableSquashing() const;
const std::vector<std::pair<std::string, std::string>>& setMetadata() const;
const message::Metadata& setMetadata() const;

std::optional<OutputTimeReference> outputTimeReference() const;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ class DayPeriodUpdater final : public PeriodUpdater {
return os.str();
};

const std::string timeUnit() const {
std::ostringstream os;
os << "day";
return os.str();
};
const std::string timeUnit() const { return "day"; };

eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const {
const auto& d = nextTime.date();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ class HourPeriodUpdater final : public PeriodUpdater {
};

const std::string timeUnit() const {
std::ostringstream os;
os << "hour";
return os.str();
return "hour";
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the ostringstream is obvious.

Further refactorings should consider using a proper enum.

};

eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ class MonthPeriodUpdater final : public PeriodUpdater {
};

const std::string timeUnit() const {
std::ostringstream os;
os << "month";
return os.str();
return "month";
};

eckit::DateTime computeWinStartTime(const eckit::DateTime& nextTime) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ class PeriodUpdater {
}
};

} // namespace multio::action::statistics_mtg2
} // namespace multio::action::statistics_mtg2
Loading
Loading