diff --git a/Makefile b/Makefile index b7b2fce16d2..5da9a137ff8 100644 --- a/Makefile +++ b/Makefile @@ -375,6 +375,12 @@ ifeq ($(CHPL_CXX),) CHPL_CXX=$(CXX) endif +ifdef ARKOUDA_DEVELOPER +ARROW_FLAGS = -g +else +ARROW_FLAGS = -O3 +endif + .PHONY: compile-arrow-cpp compile-arrow-cpp: make compile-arrow-write @@ -383,15 +389,15 @@ compile-arrow-cpp: .PHONY: compile-arrow-write compile-arrow-write: - $(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_WRITE_CPP) -o $(ARROW_WRITE_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) + $(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_WRITE_CPP) -o $(ARROW_WRITE_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) .PHONY: compile-arrow-read compile-arrow-read: - $(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_READ_CPP) -o $(ARROW_READ_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) + $(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_READ_CPP) -o $(ARROW_READ_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) .PHONY: compile-arrow-util compile-arrow-util: - $(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_UTIL_CPP) -o $(ARROW_UTIL_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) + $(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_UTIL_CPP) -o $(ARROW_UTIL_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE) $(ARROW_UTIL_O): $(ARROW_UTIL_CPP) $(ARROW_UTIL_H) make compile-arrow-util @@ -712,7 +718,7 @@ $(TEST_TARGETS): $(TEST_BINARY_DIR)/$(TEST_BINARY_SIGIL)%: $(TEST_SOURCE_DIR)/%. $(CHPL) $(TEST_CHPL_FLAGS) -M $(ARKOUDA_SOURCE_DIR) $(ARKOUDA_COMPAT_MODULES) $< -o $@ print-%: - $(info $($*)) @trues + $(info $($*)) @true size=100 test-python: diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 5a6e453b28a..83b7c3ba594 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -1517,12 +1517,16 @@ module ParquetMsg { } } - proc writeMultiColParquet(filename: string, col_names: [] string, - ncols: int, sym_names: [] string, col_objTypes: [] string, targetLocales: [] locale, - compression: int, st: borrowed SymTab): bool throws { + proc writeMultiColParquet(filename: string, col_names: [] string, ncols: int, + sym_names: [] string, col_objTypes: [] string, + targetLocales: [] locale, compression: int, + st: borrowed SymTab): bool throws { - extern proc c_writeMultiColToParquet(filename, column_names, ptr_arr, offset_arr, objTypes, - datatypes, segArr_sizes, colnum, numelems, rowGroupSize, compression, errMsg): int; + extern proc c_writeMultiColToParquet(filename, column_names, ptr_arr, + offset_arr, objTypes, datatypes, + segArr_sizes, colnum, numelems, + rowGroupSize, compression, + errMsg): int; var prefix: string; var extension: string; @@ -1919,7 +1923,18 @@ module ParquetMsg { ); } - var result: int = c_writeMultiColToParquet(fname.localize().c_str(), c_ptrTo(c_names), c_ptrTo(ptrList), c_ptrTo(segmentPtr), c_ptrTo(objTypes), c_ptrTo(datatypes), c_ptrTo(segarray_sizes), ncols, numelems, ROWGROUPS, compression, c_ptrTo(pqErr.errMsg)); + var result: int = c_writeMultiColToParquet(fname.localize().c_str(), + c_ptrTo(c_names), + c_ptrTo(ptrList), + c_ptrTo(segmentPtr), + c_ptrTo(objTypes), + c_ptrTo(datatypes), + c_ptrTo(segarray_sizes), + ncols, + numelems, + ROWGROUPS, + compression, + c_ptrTo(pqErr.errMsg)); if result == ARROWERROR { pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName()); } diff --git a/tests/server/COMPOPTS b/tests/server/COMPOPTS index 275d8989afb..cb12db976a8 100755 --- a/tests/server/COMPOPTS +++ b/tests/server/COMPOPTS @@ -14,6 +14,8 @@ TEST_FLAGS=$(cd $ARKOUDA_HOME && make -s print-TEST_CHPL_FLAGS) # -M dir SRC_DIR=$(cd $ARKOUDA_HOME && make -s print-ARKOUDA_SOURCE_DIR) +PARQUET_FLAG=-I$SRC_DIR/parquet + # Compat modules COMPAT_MODULES=$(cd $ARKOUDA_HOME && make -s print-ARKOUDA_COMPAT_MODULES) @@ -27,4 +29,4 @@ if [[ ! -f $SRC_DIR/ArrowFunctions.o ]]; then make -s -C ${ARKOUDA_HOME} compile-arrow-cpp > /dev/null 2> /dev/null fi -echo "${TEST_FLAGS} -M ${SRC_DIR} ${COMPAT_MODULES} ${CONFIG_OPTS} ${TEST_OPTS}" +echo "${TEST_FLAGS} -M ${SRC_DIR} ${PARQUET_FLAG} ${COMPAT_MODULES} ${CONFIG_OPTS} ${TEST_OPTS}" diff --git a/tests/server/TestBase.chpl b/tests/server/TestBase.chpl index d4db36d3c57..60de534da5a 100644 --- a/tests/server/TestBase.chpl +++ b/tests/server/TestBase.chpl @@ -31,13 +31,13 @@ record CommDiagSummary { } record Diags { - var T: Timer; + var T: stopwatch; var elapsedTime: real; var gatherDiags = dfltGatherDiags; var D: [LocaleSpace] commDiagnostics; proc init() { - this.complete(); + init this; resetCommDiagnostics(); D = getCommDiagnostics(); } @@ -47,7 +47,8 @@ record Diags { T.start(); } - proc stop(param name="", printTime=printTimes, printDiag=printDiags, printDiagSum=printDiagsSum) { + proc stop(param name="", printTime=printTimes, printDiag=printDiags, + printDiagSum=printDiagsSum) { T.stop(); if gatherDiags then stopCommDiagnostics(); @@ -57,7 +58,8 @@ record Diags { T.clear(); if gatherDiags then resetCommDiagnostics(); - if !gatherDiags && (printDiag || printDiagSum) then warning("gatherDiags was not enabled"); + if !gatherDiags && (printDiag || printDiagSum) then + warning("gatherDiags was not enabled"); param s = if name != "" then name + ": " else ""; if printTime then writef("%s%.2drs\n", s, this.elapsed()); if printDiag then writef('%s%s\n', s, this.comm():string); @@ -86,9 +88,8 @@ record Diags { // Message helpers proc parseName(s: string): string { - const low = [1..1].domain.low; var fields = s.split(); - return fields[low+1]; + return fields[1]; } proc parseTwoNames(s: string): (string, string) { @@ -115,9 +116,11 @@ proc writeSegString(msg: string, ss: SegString) { } -proc nameForRandintMsg(len: int, dtype:DType, aMin: int, aMax: int, st: borrowed SymTab) { +proc nameForRandintMsg(len: int, dtype:DType, aMin: int, aMax: int, + st: borrowed SymTab) { use RandMsg; - const payload = try! "%i %s %i %i None".format(len, dtype2str(dtype), aMin, aMax); + const payload = try! "%i %s %i %i None".format(len, dtype2str(dtype), aMin, + aMax); writeReq(payload); const repMsg = randintMsg(cmd='randint', payload=payload, st).msg; writeRep(repMsg); diff --git a/tests/server/UnitTestParquet.chpl b/tests/server/UnitTestParquet.chpl new file mode 100644 index 00000000000..1f89e6d0922 --- /dev/null +++ b/tests/server/UnitTestParquet.chpl @@ -0,0 +1,78 @@ +import FileSystem; + +use TestBase; +use ParquetMsg; + +// for now, this is the same as Parquet.chpl, maybe we should not make it +// private there? +private config const ROWGROUPS = 512*1024*1024 / numBytes(int); // 512 mb of int64 +config const n = 100; + +proc testWriteRead() { + var st = new owned SymTab(); + + var arrName = st.nextName(); + var a = st.addEntry(arrName, n, int); + + a.a = 2; + + var (ok, filenames, sizes) = writeDistArrayToParquet(a.a, "test.parquet", + "col", "int64", + ROWGROUPS, compression=0, + mode=0); + + var readEntry = createSymEntry(n, int); + readFilesByName(readEntry.a, filenames, sizes, "col", "int64"); + var valName = st.nextName(); + st.addEntry(valName, readEntry); + + assert(&& reduce (readEntry.a == a.a)); + + defer { + for filename in filenames { + if FileSystem.exists(filename) { + FileSystem.remove(filename); + } + } + } +} + +proc testWriteReadMultiCol() { + var st = new owned SymTab(); + + var (arrName, arrEntry) = createArray(int, 4, st); + arrEntry.a = 2; + + var (segArrSegsName, segArrSegsEntry) = createArray(int, 4, st); + var (segArrValsName, segArrValsEntry) = createArray(int, n, st); + segArrSegsEntry.a = [0, 10, 20, 30]; + segArrValsEntry.a = 3; + + var segArrJson = "segments: %s, values: %s".format(segArrSegsName, + segArrValsName); + writeMultiColParquet(filename="test.parquet", + col_names=["arrCol", "segArrCol"], + ncols=2, + sym_names=[arrName, segArrJson], + col_objTypes=["pdarray", "segarray"], + targetLocales=Locales, + compression=0, + st=st.borrow()); + + + // TODO read it back + +} + +proc createArray(type t, size, st) { + var arrName = st.nextName(); + var arrEntry = st.addEntry(arrName, size, int); + + return (arrName, arrEntry); + +} + +proc main() { + testWriteRead(); + testWriteReadMultiCol(); +} diff --git a/tests/server/UnitTestParquet.good b/tests/server/UnitTestParquet.good new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/server/UnitTestParquetCpp.chpl b/tests/server/UnitTestParquetCpp.chpl index 9cae4c38d2c..f8969de8efb 100644 --- a/tests/server/UnitTestParquetCpp.chpl +++ b/tests/server/UnitTestParquetCpp.chpl @@ -2,11 +2,41 @@ use ParquetMsg, CTypes, FileSystem; use UnitTest; use TestBase; +private config const ROWGROUPS = 512*1024*1024 / numBytes(int); // 512 mb of int64 + +type c_string = c_ptrConst(c_char); + +extern proc c_readColumnByName(filename, arr_chpl, where_null_chpl, colNum, + numElems, startIdx, batchSize, byteLength, + hasNonFloatNulls, errMsg): int; +extern proc c_writeColumnToParquet(filename, arr_chpl, colnum, + dsetname, numelems, rowGroupSize, + dtype, compression, errMsg): int; +extern proc c_getStringColumnNumBytes(filename, colname, offsets, numElems, + startIdx, batchSize, errMsg): int; +extern proc c_getDatasetNames(f: c_string, r: c_ptr(c_ptr(c_char)), + readNested, e: c_ptr(c_ptr(c_char))): int(32); +extern proc c_writeMultiColToParquet(filename: c_string, + column_names: c_ptr(void), + ptr_arr: c_ptr(c_ptr(void)), + offset_arr: c_ptr(c_ptr(void)), + objTypes: c_ptr(void), + datatypes: c_ptr(void), + segArr_sizes: c_ptr(void), + colnum: int, + numelems: int, + rowGroupSize: int, + compression: int, + errMsg: c_ptr(c_ptr(c_uchar))): int; + +extern proc c_getNumRows(chpl_str, err): int; +extern proc c_getType(filename, colname, errMsg): c_int; +extern proc c_getVersionInfo(): c_ptrConst(c_char); + +extern proc c_free_string(a); +extern proc strlen(a): int; + proc testReadWrite(filename: c_string, dsetname: c_string, size: int) { - extern proc c_readColumnByName(filename, chpl_arr, colNum, numElems, startIdx, batchSize, errMsg): int; - extern proc c_writeColumnToParquet(filename, chpl_arr, colnum, - dsetname, numelems, rowGroupSize, compressed, - dtype, errMsg): int; extern proc c_free_string(a); extern proc strlen(a): int; var errMsg: c_ptr(uint(8)); @@ -18,7 +48,8 @@ proc testReadWrite(filename: c_string, dsetname: c_string, size: int) { var a: [0..#size] int; for i in 0..#size do a[i] = i; - if c_writeColumnToParquet(filename, c_ptrTo(a), 0, dsetname, size, 10000, false, 1, errMsg) < 0 { + if c_writeColumnToParquet(filename, c_ptrTo(a), 0, dsetname, size, 10000, + ARROWINT64, 1, errMsg) < 0 { var chplMsg; try! chplMsg = string.createCopyingBuffer(errMsg, strlen(errMsg)); writeln(chplMsg); @@ -26,7 +57,8 @@ proc testReadWrite(filename: c_string, dsetname: c_string, size: int) { var b: [0..#size] int; - if(c_readColumnByName(filename, c_ptrTo(b), dsetname, size, 0, 10000, c_ptrTo(errMsg)) < 0) { + if(c_readColumnByName(filename, c_ptrTo(b), false, dsetname, size, 0, 10000, + -1, false, c_ptrTo(errMsg)) < 0) { var chplMsg; try! chplMsg = string.createCopyingBuffer(errMsg, strlen(errMsg)); writeln(chplMsg); @@ -41,9 +73,6 @@ proc testReadWrite(filename: c_string, dsetname: c_string, size: int) { } proc testInt32Read() { - extern proc c_readColumnByName(filename, chpl_arr, colNum, numElems, startIdx, batchSize, errMsg): int; - extern proc c_free_string(a); - extern proc strlen(a): int; var errMsg: c_ptr(uint(8)); defer { c_free_string(errMsg); @@ -53,8 +82,8 @@ proc testInt32Read() { var expected: [0..#50] int; for i in 0..#50 do expected[i] = i; - if(c_readColumnByName("resources/int32.parquet".c_str(), c_ptrTo(a), - "array".c_str(), 50, 0, 1, c_ptrTo(errMsg)) < 0) { + if(c_readColumnByName("resources/int32.parquet".c_str(), c_ptrTo(a), false, + "array".c_str(), 50, 0, 1, -1, false, c_ptrTo(errMsg)) < 0) { var chplMsg; try! chplMsg = string.createCopyingBuffer(errMsg, strlen(errMsg)); writeln(chplMsg); @@ -69,9 +98,6 @@ proc testInt32Read() { } proc testGetNumRows(filename: c_string, expectedSize: int) { - extern proc c_getNumRows(chpl_str, err): int; - extern proc c_free_string(a); - extern proc strlen(a): int; var errMsg: c_ptr(uint(8)); defer { c_free_string(errMsg); @@ -90,9 +116,6 @@ proc testGetNumRows(filename: c_string, expectedSize: int) { } proc testGetType(filename: c_string, dsetname: c_string) { - extern proc c_getType(filename, colname, errMsg): c_int; - extern proc c_free_string(a); - extern proc strlen(a): int; var errMsg: c_ptr(uint(8)); defer { c_free_string(errMsg); @@ -114,8 +137,6 @@ proc testGetType(filename: c_string, dsetname: c_string) { } proc testVersionInfo() { - extern proc c_getVersionInfo(): c_string; - extern proc c_free_string(ptr); var cVersionString = c_getVersionInfo(); defer { c_free_string(cVersionString); @@ -134,12 +155,10 @@ proc testVersionInfo() { } proc testGetDsets(filename) { - extern proc c_getDatasetNames(f: c_string, r: c_ptr(c_ptr(c_char)), e: c_ptr(c_ptr(c_char))): int(32); - extern proc c_free_string(ptr); - extern proc strlen(a): int; var cDsetString: c_ptr(c_char); var errMsg: c_ptr(c_char); - var st = c_getDatasetNames(filename, c_ptrTo(cDsetString), c_ptrTo(errMsg)); + var st = c_getDatasetNames(filename, c_ptrTo(cDsetString), false, + c_ptrTo(errMsg)); defer { c_free_string(cDsetString); c_free_string(errMsg); @@ -162,12 +181,6 @@ proc testGetDsets(filename) { } proc testReadStrings(filename, dsetname) { - extern proc c_readColumnByName(filename, chpl_arr, colNum, numElems, startIdx, batchSize, errMsg): int; - extern proc c_getStringColumnNumBytes(filename, colname, offsets, numElems, startIdx, errMsg): int; - extern proc c_getNumRows(chpl_str, err): int; - - extern proc c_free_string(a); - extern proc strlen(a): int; var errMsg: c_ptr(uint(8)); defer { c_free_string(errMsg); @@ -176,7 +189,8 @@ proc testReadStrings(filename, dsetname) { var size = c_getNumRows(filename, c_ptrTo(errMsg)); var offsets: [0..#size] int; - c_getStringColumnNumBytes(filename, dsetname, c_ptrTo(offsets[0]), size, 0, c_ptrTo(errMsg)); + c_getStringColumnNumBytes(filename, dsetname, c_ptrTo(offsets[0]), size, 0, + 256, c_ptrTo(errMsg)); var byteSize = + reduce offsets; if byteSize < 0 { var chplMsg; @@ -186,7 +200,8 @@ proc testReadStrings(filename, dsetname) { var a: [0..#byteSize] uint(8); - if(c_readColumnByName(filename, c_ptrTo(a), dsetname, 3, 0, 1, c_ptrTo(errMsg)) < 0) { + if(c_readColumnByName(filename, c_ptrTo(a), false, dsetname, 3, 0, 1, -1, + false, c_ptrTo(errMsg)) < 0) { var chplMsg; try! chplMsg = string.createCopyingBuffer(errMsg, strlen(errMsg)); writeln(chplMsg); @@ -206,12 +221,9 @@ proc testReadStrings(filename, dsetname) { proc testMultiDset() { const filename = 'resources/multi-col.parquet'.c_str(); - extern proc c_getDatasetNames(f: c_string, r: c_ptr(c_ptr(c_char)), e: c_ptr(c_ptr(c_char))): int(32); - extern proc c_free_string(ptr); - extern proc strlen(a): int; var cDsetString: c_ptr(c_char); var errMsg: c_ptr(c_char); - var st = c_getDatasetNames(filename, c_ptrTo(cDsetString), c_ptrTo(errMsg)); + var st = c_getDatasetNames(filename, c_ptrTo(cDsetString), false, c_ptrTo(errMsg)); defer { c_free_string(cDsetString); c_free_string(errMsg); @@ -233,6 +245,164 @@ proc testMultiDset() { } } +proc testMultiColWriteIntInt() { + + type elemType = int; + + const numCols = 4; + const numElems = 10; + + var colNames = [i in 0..#numCols] ("col"+i:string).buff; + var Arrs: [0..#numCols][0..#numElems] elemType; + + for col in Arrs.domain { + for row in Arrs[col].domain { + Arrs[col][row] = col+row; + } + } + + var ArrPtrs: [0..#numCols] c_ptr(elemType); + + for (ptr, Arr) in zip(ArrPtrs, Arrs) { + ptr = c_ptrTo(Arr); + } + + var ObjTypes = [0..#numCols] 1; // 1 is PDARRAY + var DataTypes = [0..#numCols] ARROWINT64; + + var errStr = "E"*200; + + c_writeMultiColToParquet(filename="testMultiColWrite.parquet":c_string, + column_names=c_ptrTo(colNames), + ptr_arr=c_ptrTo(ArrPtrs):c_ptr(c_ptr(void)), + offset_arr=nil, + objTypes=c_ptrTo(ObjTypes), + datatypes=c_ptrTo(DataTypes), + segArr_sizes=nil, + colnum=numCols, + numelems=numElems, + rowGroupSize=ROWGROUPS, + compression=0, + errMsg=c_ptrTo(errStr.buff)); + + return 0; +} + +proc testMultiColWriteIntBool() { + + const numCols = 2; + const numElems = 10; + + proc createArray(type elemType) { + var Arr: [0..#numElems] elemType; + return Arr; + } + + var colNames = [i in 0..#numCols] ("col"+i:string).buff; + + var ArrInt = createArray(int); + var ArrBool = createArray(bool); + + var ArrPtrs: [0..#numCols] c_ptr(void); + + ArrPtrs[0] = c_ptrTo(ArrInt); + ArrPtrs[1] = c_ptrTo(ArrBool); + + var ObjTypes = [0..#numCols] 1; // 1 is PDARRAY + + var DataTypes = [0..#numCols] ARROWINT64; + + DataTypes[0] = ARROWINT64; + DataTypes[1] = ARROWBOOLEAN; + + var errStr = "E"*200; + + c_writeMultiColToParquet(filename="testMultiColWrite.parquet":c_string, + column_names=c_ptrTo(colNames), + ptr_arr=c_ptrTo(ArrPtrs):c_ptr(c_ptr(void)), + offset_arr=nil, + objTypes=c_ptrTo(ObjTypes), + datatypes=c_ptrTo(DataTypes), + segArr_sizes=nil, + colnum=numCols, + numelems=numElems, + rowGroupSize=ROWGROUPS, + compression=0, + errMsg=c_ptrTo(errStr.buff)); + + return 0; +} + +proc testMultiColWriteIntSegArr() { + + record fakeSegArray { + var totalSize: int; + var SegmentsDom = {1..0}; + var Segments: [SegmentsDom] int; + var Data: [0..