Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ elastic_addr = localhost:9200
index = projects
featurestore_index = featurestore
app_provenance_index = appprovenance
ml_provenance_index = ml
elastic_batch = 5000
ewait_time = 5000

Expand Down
7 changes: 4 additions & 3 deletions include/FileProvenanceElasticDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ struct ProcessRowResult {

class FileProvenanceElasticDataReader : public NdbDataReader<FileProvenanceRow, SConn> {
public:
FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_cap);
FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_cap, const std::string ml_index);
virtual ~FileProvenanceElasticDataReader();
protected:

private:
FileProvenanceLogTable mFileLogTable;
INodeTable inodesTable;
std::string mMLIndex;

void processAddedandDeleted(Pq* data_batch, eBulk& bulk);
ProcessRowResult rowResult(std::list<std::string> elasticOps, FileProvenancePK logPK,
Expand All @@ -61,11 +62,11 @@ class FileProvenanceElasticDataReader : public NdbDataReader<FileProvenanceRow,
class FileProvenanceElasticDataReaders : public NdbDataReaders<FileProvenanceRow, SConn>{
public:
FileProvenanceElasticDataReaders(SConn* hopsConns, int num_readers,const bool hopsworks,
TimedRestBatcher* restEndpoint, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_ca) :
TimedRestBatcher* restEndpoint, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_ca, const std::string ml_index) :
NdbDataReaders(restEndpoint){
for(int i=0; i<num_readers; i++){
FileProvenanceElasticDataReader* dr
= new FileProvenanceElasticDataReader(hopsConns[i], hopsworks, prov_file_lru_cap, prov_core_lru_cap, inodes_lru_ca);
= new FileProvenanceElasticDataReader(hopsConns[i], hopsworks, prov_file_lru_cap, prov_core_lru_cap, inodes_lru_ca, ml_index);
dr->start(i, this);
mDataReaders.push_back(dr);
}
Expand Down
3 changes: 2 additions & 1 deletion include/Notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Notifier : public ClusterConnectionBase {
const TableUnitConf mutations_tu, const TableUnitConf schemabased_tu,const TableUnitConf provenance_tu,
const int poll_maxTimeToWait, const HttpClientConfig elastic_client_config, const bool hopsworks,
const std::string elastic_search_index, const std::string elastic_featurestore_index,
const std::string elastic_app_provenance_index,
const std::string elastic_app_provenance_index, const std::string ml_index,
const int elastic_batch_size, const int elastic_issue_time,
const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery, const bool stats,
Barrier barrier, const bool hiveCleaner, const std::string
Expand All @@ -66,6 +66,7 @@ class Notifier : public ClusterConnectionBase {
const std::string mElasticSearchIndex;
const std::string mElasticFeaturestoreIndex;
const std::string mElasticAppProvenanceIndex;
const std::string mMLIndex;
const int mElasticBatchsize;
const int mElasticIssueTime;
const int mLRUCap;
Expand Down
203 changes: 144 additions & 59 deletions include/tables/XAttrTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@
#include "FsMutationsLogTable.h"
#include "MetadataLogTable.h"

struct XAttrPartKey {
Int64 mInodeId;
Int8 mNamespace;
std::string mName;
Int16 mIndex;

XAttrPartKey(Int64 inodeId, Int8 ns, std::string name, Int16 index) {
mInodeId = inodeId;
mNamespace = ns;
mName = name;
mIndex = index;
}

bool operator==(const XAttrPartKey &pk) const {
return pk.mInodeId == mInodeId && pk.mNamespace == mNamespace && pk.mName == mName && pk.mIndex == mIndex;
}

AnyMap getAnyKey() {
AnyMap pk;
pk[0] = mInodeId;
pk[1] = mNamespace;
pk[2] = mName;
pk[3] = mIndex;
return pk;
}
};

typedef std::vector<XAttrPartKey> XAttrPartKeyVec;

struct XAttrRowPart{
Int64 mInodeId;
Int8 mNamespace;
Expand Down Expand Up @@ -261,24 +290,6 @@ struct XAttrRow {
}
};

struct XAttrPK {
Int64 mInodeId;
Int8 mNamespace;
std::string mName;

XAttrPK(Int64 inodeId, Int8 ns, std::string name) {
mInodeId = inodeId;
mNamespace = ns;
mName = name;
}

std::string to_string() {
std::stringstream out;
out << mInodeId << "-" << std::to_string(mNamespace) << "-" << mName;
return out.str();
}
};

typedef std::vector<XAttrRow> XAttrVec;
typedef boost::unordered_map<std::string, XAttrVec> XAttrMap;
typedef boost::unordered_map<std::string, XAttrPartVec> XAttrPartMap;
Expand Down Expand Up @@ -336,44 +347,42 @@ class XAttrTable : public DBTable<XAttrRowPart> {
}

XAttrMap get(Ndb* connection, Fmq* data_batch) {
AnyVec anyVec;
Fmq batchedMutations;
Fmq addAllXattrs;

for (Fmq::iterator it = data_batch->begin();
it != data_batch->end(); ++it) {
AnyVec anyVec;
XAttrPartKeyVec xattrPartKeyVec;
for (Fmq::iterator it = data_batch->begin(); it != data_batch->end(); ++it) {
FsMutationRow row = *it;
if (!row.requiresReadingXAttr() || !row.isXAttrOperation()) {
continue;
}

if(row.mOperation == XAttrAddAll){
addAllXattrs.push_back(row);
continue;
}

LOG_DEBUG("doRead batch for XAttr [ " + row.getXAttrName() + " ] to get its " << row.getNumParts() << " parts ");
for(Int16 index=0; index < row.getNumParts(); index++){
AnyMap pk;
pk[0] = row.mInodeId;
pk[1] = row.getNamespace();
pk[2] = row.getXAttrName();
pk[3] = index;
anyVec.push_back(pk);
}
addRetryKeys(row.mInodeId, row.getNamespace(), row.getXAttrName(), row.getNumParts(), anyVec, xattrPartKeyVec);
batchedMutations.push_back(row);
}
std::pair<AnyVec, XAttrPartKeyVec> xAttrKeys = std::make_pair(anyVec, xattrPartKeyVec);
XAttrMap xattrs;
int retry = 0;
while(!xAttrKeys.first.empty()) {
if(retry > 5) {
LOG_ERROR("xattr are changing to fast - epipe cannot get a consistent read");
}
XAttrPartVec xattrsParts = doRead(connection, xAttrKeys.first);
xAttrKeys = combineBatch(xattrsParts, xattrs, batchedMutations, xAttrKeys);
retry++;
}

XAttrPartVec xattrsParts = doRead(connection, anyVec);
XAttrMap results = combine(xattrsParts, batchedMutations);

for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end();
++it){
for(Fmq::iterator it = addAllXattrs.begin(); it != addAllXattrs.end(); ++it){
FsMutationRow mr = *it;
results[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId);
xattrs[mr.getPKStr()] = getByInodeId(connection, mr.mInodeId);
}

return results;
return xattrs;
}

XAttrVec getByInodeId(Ndb* connection, Int64 inodeId){
Expand All @@ -383,7 +392,7 @@ class XAttrTable : public DBTable<XAttrRowPart> {
return combine(xattrsParts);
}

boost::optional<XAttrRow> get(Ndb* connection, XAttrPK key) {
boost::optional<XAttrRow> get(Ndb* connection, XAttrPartKey key) {
XAttrRow row = get(connection, key.mInodeId, key.mNamespace, key.mName);
if(readCheckExists(key, row)) {
return row;
Expand All @@ -393,37 +402,113 @@ class XAttrTable : public DBTable<XAttrRowPart> {
}

private:
inline static bool readCheckExists(XAttrPK key, XAttrRow row) {
inline static bool readCheckExists(XAttrPartKey key, XAttrRow row) {
return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName;
}

inline static bool readCheckExists(XAttrPartKey key, XAttrRowPart row) {
return key.mInodeId == row.mInodeId && key.mNamespace == row.mNamespace && key.mName == row.mName && key.mIndex == row.mIndex;
}

/** This handles a pruned index read, so entries in partVec are well formed */
XAttrVec combine(XAttrPartVec& partVec){
XAttrPartMap xAttrsByName;
convert(partVec, xAttrsByName);
XAttrVec results;
for(auto& e : xAttrsByName){
results.push_back(XAttrRow(e.second));
}
return results;
XAttrPartMap xAttrsByName;
for(auto& part : partVec) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
}
xAttrsByName[id].push_back(part);
}
XAttrVec results;
for(auto& e :xAttrsByName){
auto& vec = e.second;
std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){
return a.mIndex < b.mIndex;
});
results.push_back(XAttrRow(e.second));
}
return results;
}

void addRetryKeys(Int64 inodeId, Int8 ns, std::string name, Int16 numParts, AnyVec& anyVec, XAttrPartKeyVec& keys) {
LOG_DEBUG("doRead batch for XAttr [ " + name + " ] to get its " << numParts << " parts ");
XAttrPartKey key(inodeId, ns, name, 0);
/** There might be multiple operations using the same XAttr. No use getting it multiple times from the db. */
if(std::find(std::begin(keys), std::end(keys), key) == std::end(keys)) {
for (Int16 index = 0; index < numParts; index++) {
XAttrPartKey pkey(inodeId, ns, name, index);
keys.push_back(pkey);
anyVec.push_back(pkey.getAnyKey());
}
}
}

XAttrMap combine(XAttrPartVec& partVec, Fmq& xAttrMutations){
/** This handles a key batch read, so entries in partVec might have junk - filter them */
std::pair<AnyVec, XAttrPartKeyVec> combineBatch(XAttrPartVec& partVec, XAttrMap& xattrs, Fmq& xAttrMutations, std::pair<AnyVec, XAttrPartKeyVec> keys){
XAttrPartMap xAttrsByName;
convert(partVec, xAttrsByName);
XAttrMap results;
for(auto& m : xAttrMutations){
convertBatch(partVec, xAttrsByName, keys.second);
XAttrPartKeyVec retryPartKeyVec;
AnyVec anyVec;
for(auto& m : xAttrMutations) {
XAttrPartKey key(m.mInodeId, m.getNamespace(), m.getXAttrName(), 0);
if(std::find(std::begin(keys.second), std::end(keys.second), key) == std::end(keys.second)) {
continue;
}
std::string id = XAttrRowPart::getXAttrUniqueId(m);
auto& vec = xAttrsByName[id];
XAttrVec xvec;
xvec.push_back(XAttrRow(vec));
results[m.getPKStr()] = xvec;
if(xAttrsByName.find(id) == xAttrsByName.end()){
/** xattr was removed */
continue;
}
auto &vec = xAttrsByName[id];
unsigned long actualSize = vec[0].mNumParts;
if (actualSize == vec.size()) {
XAttrVec xvec;
xvec.push_back(XAttrRow(vec));
xattrs[m.getPKStr()] = xvec;
} else if (actualSize < vec.size()) {
XAttrPartVec actualVec(vec.begin(), vec.begin() + actualSize);
XAttrVec xvec;
xvec.push_back(XAttrRow(actualVec));
xattrs[m.getPKStr()] = xvec;
} else {
addRetryKeys(vec[0].mInodeId, vec[0].mNamespace, vec[0].mName, vec[0].mNumParts, anyVec, retryPartKeyVec);
}
}
std::pair<AnyVec, XAttrPartKeyVec> retryKeys = std::make_pair(anyVec, retryPartKeyVec);
return retryKeys;
}

return results;
/**
* the read was done as a batch of keys - we need to check key vs result to make sure the result exists
*/
void convertBatch(XAttrPartVec& parts, XAttrPartMap& xAttrsByName, XAttrPartKeyVec keys){
for(unsigned i = 0; i < keys.size(); ++i) {
XAttrPartKey key = keys[i];
XAttrRowPart part = parts[i];
/** we check each part against the rowKey */
if(readCheckExists(key, part)) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
}
xAttrsByName[id].push_back(part);
}
}

for(auto& e :xAttrsByName){
auto& vec = e.second;
std::sort(vec.begin(), vec.end(), [](XAttrRowPart a, XAttrRowPart b){
return a.mIndex < b.mIndex;
});
}
}

void convert(XAttrPartVec& partVec, XAttrPartMap& xAttrsByName){
for(auto& part : partVec){

/**
* the read was done as a pruned index scan, only existing result returned. No need to check for sanity of results.
*/
void convert(XAttrPartVec& parts, XAttrPartMap& xAttrsByName){
for(auto& part : parts) {
std::string id = part.getXAttrUniqueId();
if(xAttrsByName.find(id) == xAttrsByName.end()){
xAttrsByName[id] = XAttrPartVec();
Expand Down
Loading