diff --git a/config.ini.template b/config.ini.template index c9d564d5..5dfa8ee1 100644 --- a/config.ini.template +++ b/config.ini.template @@ -59,6 +59,7 @@ elastic_addr = localhost:9200 index = projects featurestore_index = featurestore app_provenance_index = appprovenance +ml_provenance_index = ml elastic_batch = 5000 ewait_time = 5000 diff --git a/include/FileProvenanceElasticDataReader.h b/include/FileProvenanceElasticDataReader.h index c775d82e..f2b3b2a9 100644 --- a/include/FileProvenanceElasticDataReader.h +++ b/include/FileProvenanceElasticDataReader.h @@ -38,13 +38,14 @@ struct ProcessRowResult { class FileProvenanceElasticDataReader : public NdbDataReader { public: - FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_cap); + FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_cap, const std::string ml_index); virtual ~FileProvenanceElasticDataReader(); protected: private: FileProvenanceLogTable mFileLogTable; INodeTable inodesTable; + std::string mMLIndex; void processAddedandDeleted(Pq* data_batch, eBulk& bulk); ProcessRowResult rowResult(std::list elasticOps, FileProvenancePK logPK, @@ -61,11 +62,11 @@ class FileProvenanceElasticDataReader : public NdbDataReader{ public: FileProvenanceElasticDataReaders(SConn* hopsConns, int num_readers,const bool hopsworks, - TimedRestBatcher* restEndpoint, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_ca) : + TimedRestBatcher* restEndpoint, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_ca, const std::string ml_index) : NdbDataReaders(restEndpoint){ for(int i=0; istart(i, this); mDataReaders.push_back(dr); } diff --git a/include/Notifier.h b/include/Notifier.h index 6937b9c9..389601c6 100644 --- a/include/Notifier.h +++ b/include/Notifier.h @@ -45,7 +45,7 @@ class Notifier : public ClusterConnectionBase { const TableUnitConf mutations_tu, const TableUnitConf schemabased_tu,const TableUnitConf provenance_tu, const int poll_maxTimeToWait, const HttpClientConfig elastic_client_config, const bool hopsworks, const std::string elastic_search_index, const std::string elastic_featurestore_index, - const std::string elastic_app_provenance_index, + const std::string elastic_app_provenance_index, const std::string ml_index, const int elastic_batch_size, const int elastic_issue_time, const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery, const bool stats, Barrier barrier, const bool hiveCleaner, const std::string @@ -66,6 +66,7 @@ class Notifier : public ClusterConnectionBase { const std::string mElasticSearchIndex; const std::string mElasticFeaturestoreIndex; const std::string mElasticAppProvenanceIndex; + const std::string mMLIndex; const int mElasticBatchsize; const int mElasticIssueTime; const int mLRUCap; diff --git a/include/tables/XAttrTable.h b/include/tables/XAttrTable.h index bcffd227..a211ba22 100644 --- a/include/tables/XAttrTable.h +++ b/include/tables/XAttrTable.h @@ -23,6 +23,35 @@ #include "FsMutationsLogTable.h" #include "MetadataLogTable.h" +struct XAttrPartKey { + Int64 mInodeId; + Int8 mNamespace; + std::string mName; + Int16 mIndex; + + XAttrPartKey(Int64 inodeId, Int8 ns, std::string name, Int16 index) { + mInodeId = inodeId; + mNamespace = ns; + mName = name; + mIndex = index; + } + + bool operator==(const XAttrPartKey &pk) const { + return pk.mInodeId == mInodeId && pk.mNamespace == mNamespace && pk.mName == mName && pk.mIndex == mIndex; + } + + AnyMap getAnyKey() { + AnyMap pk; + pk[0] = mInodeId; + pk[1] = mNamespace; + pk[2] = mName; + pk[3] = mIndex; + return pk; + } +}; + +typedef std::vector XAttrPartKeyVec; + struct XAttrRowPart{ Int64 mInodeId; Int8 mNamespace; @@ -261,24 +290,6 @@ struct XAttrRow { } }; -struct XAttrPK { - Int64 mInodeId; - Int8 mNamespace; - std::string mName; - - XAttrPK(Int64 inodeId, Int8 ns, std::string name) { - mInodeId = inodeId; - mNamespace = ns; - mName = name; - } - - std::string to_string() { - std::stringstream out; - out << mInodeId << "-" << std::to_string(mNamespace) << "-" << mName; - return out.str(); - } -}; - typedef std::vector XAttrVec; typedef boost::unordered_map XAttrMap; typedef boost::unordered_map XAttrPartMap; @@ -336,44 +347,42 @@ class XAttrTable : public DBTable { } XAttrMap get(Ndb* connection, Fmq* data_batch) { - AnyVec anyVec; Fmq batchedMutations; Fmq addAllXattrs; - for (Fmq::iterator it = data_batch->begin(); - it != data_batch->end(); ++it) { + AnyVec anyVec; + XAttrPartKeyVec xattrPartKeyVec; + for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) { FsMutationRow row = *it; if (!row.requiresReadingXAttr() || !row.isXAttrOperation()) { continue; } - if(row.mOperation == XAttrAddAll){ addAllXattrs.push_back(row); continue; } - LOG_DEBUG("doRead batch for XAttr [ " + row.getXAttrName() + " ] to get its " << row.getNumParts() << " parts "); - for(Int16 index=0; index < row.getNumParts(); index++){ - AnyMap pk; - pk[0] = row.mInodeId; - pk[1] = row.getNamespace(); - pk[2] = row.getXAttrName(); - pk[3] = index; - anyVec.push_back(pk); - } + addRetryKeys(row.mInodeId, row.getNamespace(), row.getXAttrName(), row.getNumParts(), anyVec, xattrPartKeyVec); batchedMutations.push_back(row); } + std::pair xAttrKeys = std::make_pair(anyVec, xattrPartKeyVec); + XAttrMap xattrs; + int retry = 0; + while(!xAttrKeys.first.empty()) { + if(retry > 5) { + LOG_ERROR("xattr are changing to fast - epipe cannot get a consistent read"); + } + XAttrPartVec xattrsParts = doRead(connection, xAttrKeys.first); + xAttrKeys = combineBatch(xattrsParts, xattrs, batchedMutations, xAttrKeys); + retry++; + } - XAttrPartVec xattrsParts = doRead(connection, anyVec); - XAttrMap results = combine(xattrsParts, batchedMutations); - - for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end(); - ++it){ + for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end(); ++it){ FsMutationRow mr = *it; - results[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId); + xattrs[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId); } - return results; + return xattrs; } XAttrVec getByInodeId(Ndb* connection, Int64 inodeId){ @@ -383,7 +392,7 @@ class XAttrTable : public DBTable { return combine(xattrsParts); } - boost::optional get(Ndb* connection, XAttrPK key) { + boost::optional get(Ndb* connection, XAttrPartKey key) { XAttrRow row = get(connection, key.mInodeId, key.mNamespace, key.mName); if(readCheckExists(key, row)) { return row; @@ -393,37 +402,113 @@ class XAttrTable : public DBTable { } private: - inline static bool readCheckExists(XAttrPK key, XAttrRow row) { + inline static bool readCheckExists(XAttrPartKey key, XAttrRow row) { return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName; } + inline static bool readCheckExists(XAttrPartKey key, XAttrRowPart row) { + return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName && key.mIndex == row.mIndex; + } + + /** This handles a pruned index read, so entries in partVec are well formed */ XAttrVec combine(XAttrPartVec& partVec){ - XAttrPartMap xAttrsByName; - convert(partVec, xAttrsByName); - XAttrVec results; - for(auto& e : xAttrsByName){ - results.push_back(XAttrRow(e.second)); - } - return results; + XAttrPartMap xAttrsByName; + for(auto& part : partVec) { + std::string id = part.getXAttrUniqueId(); + if(xAttrsByName.find(id) == xAttrsByName.end()){ + xAttrsByName[id] = XAttrPartVec(); + } + xAttrsByName[id].push_back(part); + } + XAttrVec results; + for(auto& e :xAttrsByName){ + auto& vec = e.second; + std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){ + return a.mIndex < b.mIndex; + }); + results.push_back(XAttrRow(e.second)); + } + return results; + } + + void addRetryKeys(Int64 inodeId, Int8 ns, std::string name, Int16 numParts, AnyVec& anyVec, XAttrPartKeyVec& keys) { + LOG_DEBUG("doRead batch for XAttr [ " + name + " ] to get its " << numParts << " parts "); + XAttrPartKey key(inodeId, ns, name, 0); + /** There might be multiple operations using the same XAttr. No use getting it multiple times from the db. */ + if(std::find(std::begin(keys), std::end(keys), key) == std::end(keys)) { + for (Int16 index = 0; index < numParts; index++) { + XAttrPartKey pkey(inodeId, ns, name, index); + keys.push_back(pkey); + anyVec.push_back(pkey.getAnyKey()); + } + } } - XAttrMap combine(XAttrPartVec& partVec, Fmq& xAttrMutations){ + /** This handles a key batch read, so entries in partVec might have junk - filter them */ + std::pair combineBatch(XAttrPartVec& partVec, XAttrMap& xattrs, Fmq& xAttrMutations, std::pair keys){ XAttrPartMap xAttrsByName; - convert(partVec, xAttrsByName); - XAttrMap results; - for(auto& m : xAttrMutations){ + convertBatch(partVec, xAttrsByName, keys.second); + XAttrPartKeyVec retryPartKeyVec; + AnyVec anyVec; + for(auto& m : xAttrMutations) { + XAttrPartKey key(m.mInodeId, m.getNamespace(), m.getXAttrName(), 0); + if(std::find(std::begin(keys.second), std::end(keys.second), key) == std::end(keys.second)) { + continue; + } std::string id = XAttrRowPart::getXAttrUniqueId(m); - auto& vec = xAttrsByName[id]; - XAttrVec xvec; - xvec.push_back(XAttrRow(vec)); - results[m.getPKStr()] = xvec; + if(xAttrsByName.find(id) == xAttrsByName.end()){ + /** xattr was removed */ + continue; + } + auto &vec = xAttrsByName[id]; + unsigned long actualSize = vec[0].mNumParts; + if (actualSize == vec.size()) { + XAttrVec xvec; + xvec.push_back(XAttrRow(vec)); + xattrs[m.getPKStr()] = xvec; + } else if (actualSize < vec.size()) { + XAttrPartVec actualVec(vec.begin(), vec.begin() + actualSize); + XAttrVec xvec; + xvec.push_back(XAttrRow(actualVec)); + xattrs[m.getPKStr()] = xvec; + } else { + addRetryKeys(vec[0].mInodeId, vec[0].mNamespace, vec[0].mName, vec[0].mNumParts, anyVec, retryPartKeyVec); + } } + std::pair retryKeys = std::make_pair(anyVec, retryPartKeyVec); + return retryKeys; + } - return results; + /** + * the read was done as a batch of keys - we need to check key vs result to make sure the result exists + */ + void convertBatch(XAttrPartVec& parts, XAttrPartMap& xAttrsByName, XAttrPartKeyVec keys){ + for(unsigned i = 0; i < keys.size(); ++i) { + XAttrPartKey key = keys[i]; + XAttrRowPart part = parts[i]; + /** we check each part against the rowKey */ + if(readCheckExists(key, part)) { + std::string id = part.getXAttrUniqueId(); + if(xAttrsByName.find(id) == xAttrsByName.end()){ + xAttrsByName[id] = XAttrPartVec(); + } + xAttrsByName[id].push_back(part); + } + } + + for(auto& e :xAttrsByName){ + auto& vec = e.second; + std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){ + return a.mIndex < b.mIndex; + }); + } } - - void convert(XAttrPartVec& partVec, XAttrPartMap& xAttrsByName){ - for(auto& part : partVec){ + + /** + * the read was done as a pruned index scan, only existing result returned. No need to check for sanity of results. + */ + void convert(XAttrPartVec& parts, XAttrPartMap& xAttrsByName){ + for(auto& part : parts) { std::string id = part.getXAttrUniqueId(); if(xAttrsByName.find(id) == xAttrsByName.end()){ xAttrsByName[id] = XAttrPartVec(); diff --git a/src/FileProvenanceElasticDataReader.cpp b/src/FileProvenanceElasticDataReader.cpp index e85549ee..d043df00 100644 --- a/src/FileProvenanceElasticDataReader.cpp +++ b/src/FileProvenanceElasticDataReader.cpp @@ -17,8 +17,8 @@ #include "FileProvenanceElasticDataReader.h" FileProvenanceElasticDataReader::FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, - int file_lru_cap, int xattr_lru_cap, int inodes_lru_cap) -: NdbDataReader(hopsConn, hopsworks), mFileLogTable(file_lru_cap, xattr_lru_cap), inodesTable(inodes_lru_cap) { + int file_lru_cap, int xattr_lru_cap, int inodes_lru_cap, const std::string ml_index) +: NdbDataReader(hopsConn, hopsworks), mFileLogTable(file_lru_cap, xattr_lru_cap), inodesTable(inodes_lru_cap), mMLIndex(ml_index) { } class ElasticHelper { @@ -52,7 +52,6 @@ class ElasticHelper { dataVal.AddMember("project_name", rapidjson::Value().SetString(row.mProjectName.c_str(), dataAlloc), dataAlloc); dataVal.AddMember("ml_id", rapidjson::Value().SetString(mlId.c_str(), dataAlloc), dataAlloc); dataVal.AddMember("ml_type", rapidjson::Value().SetString(FileProvenanceConstants::MLTypeToStr(mlType).c_str(), dataAlloc), dataAlloc); - dataVal.AddMember("entry_type", rapidjson::Value().SetString("state", dataAlloc), dataAlloc); dataVal.AddMember("partition_id", rapidjson::Value().SetInt64(row.mPartitionId), dataAlloc); dataVal.AddMember("r_create_timestamp", rapidjson::Value().SetString(readable_timestamp(row.mTimestamp).c_str(), dataAlloc), dataAlloc); @@ -302,7 +301,7 @@ class ElasticHelper { dataVal.AddMember("entry_type", rapidjson::Value().SetString("operation", dataAlloc), dataAlloc); dataVal.AddMember("partition_id", rapidjson::Value().SetInt64(row.mPartitionId), dataAlloc); dataVal.AddMember("r_timestamp", rapidjson::Value().SetString(readable_timestamp(row.mTimestamp).c_str(), dataAlloc), dataAlloc); - + data.AddMember("doc", dataVal, dataAlloc); data.AddMember("doc_as_upsert", rapidjson::Value().SetBool(true), dataAlloc); @@ -564,11 +563,11 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow switch (datasetProvCore.get()) { case FileProvenanceConstants::STORE_NONE: break; case FileProvenanceConstants::STORE_STATE: { - std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), projectIndex, row, mlAux.second, mlAux.first); + std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), mMLIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(state); } break; case FileProvenanceConstants::STORE_ALL: { - std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), projectIndex, row, mlAux.second, mlAux.first); + std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), mMLIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(state); std::string op = ElasticHelper::fileOp(ElasticHelper::opId(row), projectIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(op); @@ -607,11 +606,11 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow switch (datasetProvCore.get()) { case FileProvenanceConstants::STORE_NONE: break; case FileProvenanceConstants::STORE_STATE: { - std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), projectIndex); + std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), mMLIndex); bulkOps.push_back(state); } break; case FileProvenanceConstants::STORE_ALL: { - std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), projectIndex); + std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), mMLIndex); bulkOps.push_back(state); std::string op = ElasticHelper::fileOp(ElasticHelper::opId(row), projectIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(op); @@ -701,12 +700,12 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow if (!skipElasticOp) { switch (datasetProvCore.get()) { case FileProvenanceConstants::ProvOpStoreType::STORE_NONE: { - std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), projectIndex); + std::string state = ElasticHelper::deadState(ElasticHelper::stateId(row), mMLIndex); bulkOps.push_back(state); } break; case FileProvenanceConstants::ProvOpStoreType::STORE_STATE: case FileProvenanceConstants::ProvOpStoreType::STORE_ALL: { - std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), projectIndex, row, mlAux.second, mlAux.first); + std::string state = ElasticHelper::aliveState(ElasticHelper::stateId(row), mMLIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(state); } break; default: { @@ -729,18 +728,18 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow switch (datasetProvCore.get()) { case FileProvenanceConstants::ProvOpStoreType::STORE_STATE: { if (row.mXAttrName == FileProvenanceConstants::XATTR_PROJECT_IID) { - std::string projIIdStateVal = ElasticHelper::addProjectIIdToState(ElasticHelper::stateId(row), projectIndex, row); + std::string projIIdStateVal = ElasticHelper::addProjectIIdToState(ElasticHelper::stateId(row), mMLIndex, row); bulkOps.push_back(projIIdStateVal); } - std::string xattrStateVal = ElasticHelper::addXAttrToState(ElasticHelper::stateId(row), projectIndex, row, xattr.get().mValue); + std::string xattrStateVal = ElasticHelper::addXAttrToState(ElasticHelper::stateId(row), mMLIndex, row, xattr.get().mValue); bulkOps.push_back(xattrStateVal); } break; case FileProvenanceConstants::ProvOpStoreType::STORE_ALL: { if (row.mXAttrName == FileProvenanceConstants::XATTR_PROJECT_IID) { - std::string projIIdStateVal = ElasticHelper::addProjectIIdToState(ElasticHelper::stateId(row), projectIndex, row); + std::string projIIdStateVal = ElasticHelper::addProjectIIdToState(ElasticHelper::stateId(row), mMLIndex, row); bulkOps.push_back(projIIdStateVal); } - std::string xattrStateVal = ElasticHelper::addXAttrToState(ElasticHelper::stateId(row), projectIndex, row, xattr.get().mValue); + std::string xattrStateVal = ElasticHelper::addXAttrToState(ElasticHelper::stateId(row), mMLIndex, row, xattr.get().mValue); bulkOps.push_back(xattrStateVal); std::string xattrOpVal = ElasticHelper::addXAttrOp(ElasticHelper::opId(row), projectIndex, row, xattr.get().mValue, mlAux.second, mlAux.first); bulkOps.push_back(xattrOpVal); @@ -792,11 +791,11 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow switch (datasetProvCore.get()) { case FileProvenanceConstants::STORE_NONE: break; case FileProvenanceConstants::STORE_STATE: { - std::string xattrStateVal = ElasticHelper::deleteXAttrFromState(ElasticHelper::stateId(row), projectIndex, row); + std::string xattrStateVal = ElasticHelper::deleteXAttrFromState(ElasticHelper::stateId(row), mMLIndex, row); bulkOps.push_back(xattrStateVal); } break; case FileProvenanceConstants::STORE_ALL: { - std::string xattrStateVal = ElasticHelper::deleteXAttrFromState(ElasticHelper::stateId(row), projectIndex, row); + std::string xattrStateVal = ElasticHelper::deleteXAttrFromState(ElasticHelper::stateId(row), mMLIndex, row); bulkOps.push_back(xattrStateVal); std::string xattrOpVal = ElasticHelper::deleteXAttrOp(ElasticHelper::opId(row), projectIndex, row, mlAux.second, mlAux.first); bulkOps.push_back(xattrOpVal); diff --git a/src/Notifier.cpp b/src/Notifier.cpp index b60db13a..d15a33dc 100644 --- a/src/Notifier.cpp +++ b/src/Notifier.cpp @@ -25,8 +25,8 @@ Notifier::Notifier(const char* connection_string, const char* database_name, const TableUnitConf elastic_provenance_tu, const int poll_maxTimeToWait, const HttpClientConfig elastic_client_config, const bool hopsworks, const std::string elastic_search_index, const std::string elastic_featurestore_index, - const std::string elastic_app_provenance_index, - const int elastic_batch_size, const int elastic_issue_time, + const std::string elastic_app_provenance_index, const std::string ml_index, + const int elastic_batch_size, const int elastic_issue_time, const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery, const bool stats, Barrier barrier, const bool hiveCleaner, const std::string metricsServer) @@ -35,7 +35,7 @@ Notifier::Notifier(const char* connection_string, const char* database_name, mFileProvenanceTU(elastic_provenance_tu), mAppProvenanceTU(elastic_provenance_tu), mPollMaxTimeToWait(poll_maxTimeToWait), mElasticClientConfig(elastic_client_config), mHopsworksEnabled(hopsworks), mElasticSearchIndex(elastic_search_index), mElasticFeaturestoreIndex(elastic_featurestore_index), - mElasticAppProvenanceIndex(elastic_app_provenance_index), + mElasticAppProvenanceIndex(elastic_app_provenance_index), mMLIndex(ml_index), mElasticBatchsize(elastic_batch_size), mElasticIssueTime(elastic_issue_time), mLRUCap(lru_cap), mProvFileLRUCap(prov_file_lru_cap), mProvCoreLRUCap(prov_core_lru_cap), mRecovery(recovery), mStats(stats), mBarrier(barrier), mHiveCleaner(hiveCleaner), mMetricsServer(metricsServer) { @@ -215,7 +215,7 @@ void Notifier::setup() { file_prov_hops_connections[i] = create_ndb_connection(mDatabaseName); } mFileProvenanceElasticDataReaders = new FileProvenanceElasticDataReaders(file_prov_hops_connections, - mFileProvenanceTU.mNumReaders, mHopsworksEnabled, mFileProvenanceElastic, mProvFileLRUCap, mProvCoreLRUCap, mLRUCap); + mFileProvenanceTU.mNumReaders, mHopsworksEnabled, mFileProvenanceElastic, mProvFileLRUCap, mProvCoreLRUCap, mLRUCap, mMLIndex); mFileProvenanceBatcher = new RCBatcher( mFileProvenanceTableTailer, mFileProvenanceElasticDataReaders, mFileProvenanceTU.mWaitTime, mFileProvenanceTU.mBatchSize); diff --git a/src/main.cpp b/src/main.cpp index 684b70b2..dce9db7a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -48,6 +48,7 @@ int main(int argc, char** argv) { std::string elastic_featurestore_index = "featurestore"; std::string elastic_app_provenance_index = "appprovenance"; + std::string elastic_ml_index = "ml"; int lru_cap = DEFAULT_MAX_CAPACITY; int prov_file_lru_cap = DEFAULT_MAX_CAPACITY; @@ -119,6 +120,7 @@ int main(int argc, char** argv) { ("index", po::value(&elastic_index)->default_value(elastic_index), "Elastic index to add the data to.") ("featurestore_index", po::value(&elastic_featurestore_index)->default_value(elastic_featurestore_index), "Elastic featurestore index.") ("app_provenance_index", po::value(&elastic_app_provenance_index)->default_value(elastic_app_provenance_index), "Elastic index to add the app provenance data to.") + ("ml_provenance_index", po::value(&elastic_ml_index)->default_value(elastic_ml_index), "Elastic index to add the ML provenance state data to.") ("elastic_batch", po::value(&elastic_batch_size)->default_value(elastic_batch_size), "Elastic batch size in bytes for bulk requests") @@ -255,7 +257,7 @@ int main(int argc, char** argv) { provenance_tu, poll_maxTimeToWait, config, hopsworks, elastic_index, elastic_featurestore_index, - elastic_app_provenance_index, + elastic_app_provenance_index, elastic_ml_index, elastic_batch_size, elastic_issue_time, lru_cap, prov_file_lru_cap, prov_core_lru_cap, recovery, stats, barrier,