Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,12 @@ ifeq ($(CHPL_CXX),)
CHPL_CXX=$(CXX)
endif

ifdef ARKOUDA_DEVELOPER
ARROW_FLAGS = -g
else
ARROW_FLAGS = -O3
endif

.PHONY: compile-arrow-cpp
compile-arrow-cpp:
make compile-arrow-write
Expand All @@ -383,15 +389,15 @@ compile-arrow-cpp:

.PHONY: compile-arrow-write
compile-arrow-write:
$(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_WRITE_CPP) -o $(ARROW_WRITE_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)
$(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_WRITE_CPP) -o $(ARROW_WRITE_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)

.PHONY: compile-arrow-read
compile-arrow-read:
$(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_READ_CPP) -o $(ARROW_READ_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)
$(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_READ_CPP) -o $(ARROW_READ_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)

.PHONY: compile-arrow-util
compile-arrow-util:
$(CHPL_CXX) -O3 -std=c++17 -c $(ARROW_UTIL_CPP) -o $(ARROW_UTIL_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)
$(CHPL_CXX) -std=c++17 $(ARROW_FLAGS) -c $(ARROW_UTIL_CPP) -o $(ARROW_UTIL_O) $(INCLUDE_FLAGS) $(ARROW_SANITIZE)

$(ARROW_UTIL_O): $(ARROW_UTIL_CPP) $(ARROW_UTIL_H)
make compile-arrow-util
Expand Down Expand Up @@ -712,7 +718,7 @@ $(TEST_TARGETS): $(TEST_BINARY_DIR)/$(TEST_BINARY_SIGIL)%: $(TEST_SOURCE_DIR)/%.
$(CHPL) $(TEST_CHPL_FLAGS) -M $(ARKOUDA_SOURCE_DIR) $(ARKOUDA_COMPAT_MODULES) $< -o $@

print-%:
$(info $($*)) @trues
$(info $($*)) @true

size=100
test-python:
Expand Down
27 changes: 21 additions & 6 deletions src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -1517,12 +1517,16 @@ module ParquetMsg {
}
}

proc writeMultiColParquet(filename: string, col_names: [] string,
ncols: int, sym_names: [] string, col_objTypes: [] string, targetLocales: [] locale,
compression: int, st: borrowed SymTab): bool throws {
proc writeMultiColParquet(filename: string, col_names: [] string, ncols: int,
sym_names: [] string, col_objTypes: [] string,
targetLocales: [] locale, compression: int,
st: borrowed SymTab): bool throws {

extern proc c_writeMultiColToParquet(filename, column_names, ptr_arr, offset_arr, objTypes,
datatypes, segArr_sizes, colnum, numelems, rowGroupSize, compression, errMsg): int;
extern proc c_writeMultiColToParquet(filename, column_names, ptr_arr,
offset_arr, objTypes, datatypes,
segArr_sizes, colnum, numelems,
rowGroupSize, compression,
errMsg): int;

var prefix: string;
var extension: string;
Expand Down Expand Up @@ -1919,7 +1923,18 @@ module ParquetMsg {
);
}

var result: int = c_writeMultiColToParquet(fname.localize().c_str(), c_ptrTo(c_names), c_ptrTo(ptrList), c_ptrTo(segmentPtr), c_ptrTo(objTypes), c_ptrTo(datatypes), c_ptrTo(segarray_sizes), ncols, numelems, ROWGROUPS, compression, c_ptrTo(pqErr.errMsg));
var result: int = c_writeMultiColToParquet(fname.localize().c_str(),
c_ptrTo(c_names),
c_ptrTo(ptrList),
c_ptrTo(segmentPtr),
c_ptrTo(objTypes),
c_ptrTo(datatypes),
c_ptrTo(segarray_sizes),
ncols,
numelems,
ROWGROUPS,
compression,
c_ptrTo(pqErr.errMsg));
if result == ARROWERROR {
pqErr.parquetError(getLineNumber(), getRoutineName(), getModuleName());
}
Expand Down
4 changes: 3 additions & 1 deletion tests/server/COMPOPTS
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ TEST_FLAGS=$(cd $ARKOUDA_HOME && make -s print-TEST_CHPL_FLAGS)
# -M dir
SRC_DIR=$(cd $ARKOUDA_HOME && make -s print-ARKOUDA_SOURCE_DIR)

PARQUET_FLAG=-I$SRC_DIR/parquet

# Compat modules
COMPAT_MODULES=$(cd $ARKOUDA_HOME && make -s print-ARKOUDA_COMPAT_MODULES)

Expand All @@ -27,4 +29,4 @@ if [[ ! -f $SRC_DIR/ArrowFunctions.o ]]; then
make -s -C ${ARKOUDA_HOME} compile-arrow-cpp > /dev/null 2> /dev/null
fi

echo "${TEST_FLAGS} -M ${SRC_DIR} ${COMPAT_MODULES} ${CONFIG_OPTS} ${TEST_OPTS}"
echo "${TEST_FLAGS} -M ${SRC_DIR} ${PARQUET_FLAG} ${COMPAT_MODULES} ${CONFIG_OPTS} ${TEST_OPTS}"
19 changes: 11 additions & 8 deletions tests/server/TestBase.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ record CommDiagSummary {
}

record Diags {
var T: Timer;
var T: stopwatch;
var elapsedTime: real;
var gatherDiags = dfltGatherDiags;
var D: [LocaleSpace] commDiagnostics;

proc init() {
this.complete();
init this;
resetCommDiagnostics();
D = getCommDiagnostics();
}
Expand All @@ -47,7 +47,8 @@ record Diags {
T.start();
}

proc stop(param name="", printTime=printTimes, printDiag=printDiags, printDiagSum=printDiagsSum) {
proc stop(param name="", printTime=printTimes, printDiag=printDiags,
printDiagSum=printDiagsSum) {
T.stop();
if gatherDiags then stopCommDiagnostics();

Expand All @@ -57,7 +58,8 @@ record Diags {
T.clear();
if gatherDiags then resetCommDiagnostics();

if !gatherDiags && (printDiag || printDiagSum) then warning("gatherDiags was not enabled");
if !gatherDiags && (printDiag || printDiagSum) then
warning("gatherDiags was not enabled");
param s = if name != "" then name + ": " else "";
if printTime then writef("%s%.2drs\n", s, this.elapsed());
if printDiag then writef('%s%s\n', s, this.comm():string);
Expand Down Expand Up @@ -86,9 +88,8 @@ record Diags {
// Message helpers

proc parseName(s: string): string {
const low = [1..1].domain.low;
var fields = s.split();
return fields[low+1];
return fields[1];
}

proc parseTwoNames(s: string): (string, string) {
Expand All @@ -115,9 +116,11 @@ proc writeSegString(msg: string, ss: SegString) {
}


proc nameForRandintMsg(len: int, dtype:DType, aMin: int, aMax: int, st: borrowed SymTab) {
proc nameForRandintMsg(len: int, dtype:DType, aMin: int, aMax: int,
st: borrowed SymTab) {
use RandMsg;
const payload = try! "%i %s %i %i None".format(len, dtype2str(dtype), aMin, aMax);
const payload = try! "%i %s %i %i None".format(len, dtype2str(dtype), aMin,
aMax);
writeReq(payload);
const repMsg = randintMsg(cmd='randint', payload=payload, st).msg;
writeRep(repMsg);
Expand Down
78 changes: 78 additions & 0 deletions tests/server/UnitTestParquet.chpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import FileSystem;

use TestBase;
use ParquetMsg;

// for now, this is the same as Parquet.chpl, maybe we should not make it
// private there?
private config const ROWGROUPS = 512*1024*1024 / numBytes(int); // 512 mb of int64
config const n = 100;

proc testWriteRead() {
var st = new owned SymTab();

var arrName = st.nextName();
var a = st.addEntry(arrName, n, int);

a.a = 2;

var (ok, filenames, sizes) = writeDistArrayToParquet(a.a, "test.parquet",
"col", "int64",
ROWGROUPS, compression=0,
mode=0);

var readEntry = createSymEntry(n, int);
readFilesByName(readEntry.a, filenames, sizes, "col", "int64");
var valName = st.nextName();
st.addEntry(valName, readEntry);

assert(&& reduce (readEntry.a == a.a));

defer {
for filename in filenames {
if FileSystem.exists(filename) {
FileSystem.remove(filename);
}
}
}
}

proc testWriteReadMultiCol() {
var st = new owned SymTab();

var (arrName, arrEntry) = createArray(int, 4, st);
arrEntry.a = 2;

var (segArrSegsName, segArrSegsEntry) = createArray(int, 4, st);
var (segArrValsName, segArrValsEntry) = createArray(int, n, st);
segArrSegsEntry.a = [0, 10, 20, 30];
segArrValsEntry.a = 3;

var segArrJson = "segments: %s, values: %s".format(segArrSegsName,
segArrValsName);
writeMultiColParquet(filename="test.parquet",
col_names=["arrCol", "segArrCol"],
ncols=2,
sym_names=[arrName, segArrJson],
col_objTypes=["pdarray", "segarray"],
targetLocales=Locales,
compression=0,
st=st.borrow());


// TODO read it back

}

proc createArray(type t, size, st) {
var arrName = st.nextName();
var arrEntry = st.addEntry(arrName, size, int);

return (arrName, arrEntry);

}

proc main() {
testWriteRead();
testWriteReadMultiCol();
}
Empty file.
Loading
Loading