From df5df378ac368fea468a1c2b2a8114775a75077a Mon Sep 17 00:00:00 2001 From: Ben McDonald <46734217+bmcdonald3@users.noreply.github.com> Date: Wed, 26 Oct 2022 11:00:54 -0400 Subject: [PATCH 1/3] Add parallel writing for writing pdarrays to Parquet This PR adds support for a `parallelWriteThreshold` flag that allows a user to determine the size of files of the Parquet files to be written and then write those files in parallel, appending a `_CORE####` to the end of the file name. By running the Arkouda server with: `./arkouda_server --ParquetMsg.parallelWriteThreshold=`, a user is able to control the size of the files that are going to be written. This is currently only supported on pdarrays of natively-supported datatypes (meaning not strings or dataframes), but follow work is on the way. --- src/ParquetMsg.chpl | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 366428e0be3..e270cb8e8ce 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -46,6 +46,8 @@ module ParquetMsg { // Undocumented for now, just for internal experiments private config const batchSize = getEnvInt("ARKOUDA_SERVER_PARQUET_BATCH_SIZE", 8192); + private config const parallelWriteThreshold = 512*1024*1024 / numBytes(int); + extern var ARROWINT64: c_int; extern var ARROWINT32: c_int; extern var ARROWUINT64: c_int; @@ -417,6 +419,7 @@ module ParquetMsg { dtype, compression, errMsg): int; var dtypeRep = toCDtype(dtype); + var doParallel = if A.size > parallelWriteThreshold then true else false; var prefix: string; var extension: string; @@ -454,10 +457,39 @@ module ParquetMsg { valPtr = c_ptrTo(locArr); } if mode == TRUNCATE || !filesExist { +<<<<<<< HEAD if c_writeColumnToParquet(myFilename.localize().c_str(), valPtr, 0, dsetname.localize().c_str(), locDom.size, rowGroupSize, dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); +======= + if !doParallel { + if c_writeColumnToParquet(myFilename.localize().c_str(), c_ptrTo(locArr), 0, + dsetname.localize().c_str(), locDom.size, rowGroupSize, + dtypeRep, compressed, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + } + } else { + var fileSizes: [0..#loc.maxTaskPar] int = locDom.size/loc.maxTaskPar; + // First file has the extra elements if it isn't evenly divisible by maxTaskPar + fileSizes[0] += locDom.size - ((locDom.size/loc.maxTaskPar)*loc.maxTaskPar); + + var offsets = + scan fileSizes; + + forall i in fileSizes.domain { + var suffix = '%04i'.format(idx): string; + var parSuffix = '%04i'.format(i): string; + const parFilename = filename + "_LOCALE" + suffix + "_CORE" + parSuffix + ".parquet"; + var oi = if i == 0 then i else offsets[i-1]; + var coreArr = locArr[oi..#(fileSizes[i])]; + var pqErr = new parquetErrorMsg(); + if c_writeColumnToParquet(parFilename.localize().c_str(), c_ptrTo(coreArr), 0, + dsetname.localize().c_str(), coreArr.size, rowGroupSize, + dtypeRep, compressed, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); + } + } +>>>>>>> 6e08f5bf5 (Add parallel writing for truncate Parquet) } } else { if c_appendColumnToParquet(myFilename.localize().c_str(), valPtr, From 3e857425672b1ab1d97226fbf1360473c6ffdf8b Mon Sep 17 00:00:00 2001 From: Ben McDonald <46734217+bmcdonald3@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:27:42 -0800 Subject: [PATCH 2/3] Update write parquet call --- src/ParquetMsg.chpl | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index e270cb8e8ce..d227eb46130 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -457,16 +457,10 @@ module ParquetMsg { valPtr = c_ptrTo(locArr); } if mode == TRUNCATE || !filesExist { -<<<<<<< HEAD - if c_writeColumnToParquet(myFilename.localize().c_str(), valPtr, 0, - dsetname.localize().c_str(), locDom.size, rowGroupSize, - dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { - pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); -======= if !doParallel { - if c_writeColumnToParquet(myFilename.localize().c_str(), c_ptrTo(locArr), 0, + if c_writeColumnToParquet(myFilename.localize().c_str(), valPtr, 0, dsetname.localize().c_str(), locDom.size, rowGroupSize, - dtypeRep, compressed, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); } } else { @@ -489,7 +483,6 @@ module ParquetMsg { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); } } ->>>>>>> 6e08f5bf5 (Add parallel writing for truncate Parquet) } } else { if c_appendColumnToParquet(myFilename.localize().c_str(), valPtr, From 159db71314d022313865f619e50c3a8146361743 Mon Sep 17 00:00:00 2001 From: Ben McDonald <46734217+bmcdonald3@users.noreply.github.com> Date: Wed, 13 Dec 2023 09:46:16 -0800 Subject: [PATCH 3/3] Fix compressed type --- src/ParquetMsg.chpl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index d227eb46130..e9c267990ba 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -479,7 +479,7 @@ module ParquetMsg { var pqErr = new parquetErrorMsg(); if c_writeColumnToParquet(parFilename.localize().c_str(), c_ptrTo(coreArr), 0, dsetname.localize().c_str(), coreArr.size, rowGroupSize, - dtypeRep, compressed, c_ptrTo(pqErr.errMsg)) == ARROWERROR { + dtypeRep, compression, c_ptrTo(pqErr.errMsg)) == ARROWERROR { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); } }