Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion extension
8 changes: 7 additions & 1 deletion scripts/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,13 @@ iC_AttachDatabase
: ATTACH SP StringLiteral (SP AS SP oC_SchemaName)? SP '(' SP? DBTYPE SP oC_SymbolicName (SP? ',' SP? iC_Options)? SP? ')' ;

iC_Option
: oC_SymbolicName (SP? '=' SP? | SP*) oC_Literal | oC_SymbolicName;
: oC_SymbolicName (SP? '=' SP? | SP*) oC_Literal (SP? iC_OptionQualifier)? | oC_SymbolicName;

// An optional parenthesized qualifier attached to a valued COPY option. Currently only
// `IGNORE_ERRORS=true (DUPLICATE_PK_ONLY)` is meaningful: it is rewritten at parse time into the
// internal SKIP_DUPLICATE_PK option (duplicate-primary-key-only ignore mode).
iC_OptionQualifier
: '(' SP? oC_SymbolicName SP? ')' ;

iC_Options
: iC_Option ( SP? ',' SP? iC_Option )* ;
Expand Down
2 changes: 1 addition & 1 deletion scripts/antlr4/hash.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
13b11d5b1e7e230715651c647e9877e1
6298a7b244eb61ba5477ccc6eac619f7
8 changes: 7 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ iC_AttachDatabase
: ATTACH SP StringLiteral (SP AS SP oC_SchemaName)? SP '(' SP? DBTYPE SP oC_SymbolicName (SP? ',' SP? iC_Options)? SP? ')' ;

iC_Option
: oC_SymbolicName (SP? '=' SP? | SP*) oC_Literal | oC_SymbolicName;
: oC_SymbolicName (SP? '=' SP? | SP*) oC_Literal (SP? iC_OptionQualifier)? | oC_SymbolicName;

// An optional parenthesized qualifier attached to a valued COPY option. Currently only
// `IGNORE_ERRORS=true (DUPLICATE_PK_ONLY)` is meaningful: it is rewritten at parse time into the
// internal SKIP_DUPLICATE_PK option (duplicate-primary-key-only ignore mode).
iC_OptionQualifier
: '(' SP? oC_SymbolicName SP? ')' ;

iC_Options
: iC_Option ( SP? ',' SP? iC_Option )* ;
Expand Down
11 changes: 6 additions & 5 deletions src/binder/bind/copy/bind_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,12 @@ BoundCopyFromInfo Binder::bindCopyNodeFromInfo(std::string tableName,
const auto ignoreErrors = getBoolCopyOption(parsingOptions,
CopyConstants::IGNORE_ERRORS_OPTION_NAME, CopyConstants::DEFAULT_IGNORE_ERRORS);
if (skipDuplicatePK && ignoreErrors) {
throw BinderException("SKIP_DUPLICATE_PK cannot be used together with IGNORE_ERRORS.");
throw BinderException("IGNORE_ERRORS=true (DUPLICATE_PK_ONLY) cannot be used together with "
"IGNORE_ERRORS=true.");
}
if (skipDuplicatePK && source->type != ScanSourceType::FILE) {
throw BinderException(
"SKIP_DUPLICATE_PK is only supported for COPY FROM files into node tables.");
throw BinderException("IGNORE_ERRORS=true (DUPLICATE_PK_ONLY) is only supported for COPY "
"FROM files into node tables.");
}
auto boundSource =
bindScanSource(source, parsingOptions, expectedColumnNames, expectedColumnTypes);
Expand Down Expand Up @@ -218,8 +219,8 @@ BoundCopyFromInfo Binder::bindCopyRelFromInfo(std::string tableName,
const NodeTableCatalogEntry* toTable) {
if (getBoolCopyOption(parsingOptions, CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME,
CopyConstants::DEFAULT_SKIP_DUPLICATE_PK)) {
throw BinderException(
"SKIP_DUPLICATE_PK is only supported for COPY FROM files into node tables.");
throw BinderException("IGNORE_ERRORS=true (DUPLICATE_PK_ONLY) is only supported for COPY "
"FROM files into node tables.");
}
auto boundSource =
bindScanSource(source, parsingOptions, expectedColumnNames, expectedColumnTypes);
Expand Down
6 changes: 3 additions & 3 deletions src/include/binder/copy/bound_copy_from.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ class BoundCopyFrom final : public BoundStatement {
public:
explicit BoundCopyFrom(BoundCopyFromInfo info)
: BoundStatement{statementType_,
info.getSkipDuplicatePKOption() ?
createSkipDuplicatePKResult() :
info.tableType == common::TableType::NODE ?
createNodeCopyResult() :
BoundStatementResult::createSingleStringColumnResult()},
info{std::move(info)} {}

const BoundCopyFromInfo* getInfo() const { return &info; }

private:
static BoundStatementResult createSkipDuplicatePKResult() {
static BoundStatementResult createNodeCopyResult() {
auto result = BoundStatementResult::createSingleStringColumnResult();
auto skippedCount = std::make_shared<LiteralExpression>(common::Value{int64_t(0)},
"skipped_duplicate_pk_count");
Expand Down
4 changes: 4 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,11 @@ struct CopyConstants {
static constexpr uint64_t PARALLEL_BLOCK_SIZE = INITIAL_BUFFER_SIZE / 2;

static constexpr const char* IGNORE_ERRORS_OPTION_NAME = "IGNORE_ERRORS";
// Internal name of the duplicate-primary-key skip option. The user-facing COPY syntax is
// `IGNORE_ERRORS=true (DUPLICATE_PK_ONLY)`, which `Transformer::transformOptions` rewrites into
// this option key so the existing duplicate-PK skip path stays intact.
static constexpr const char* SKIP_DUPLICATE_PK_OPTION_NAME = "SKIP_DUPLICATE_PK";
static constexpr const char* DUPLICATE_PK_ONLY_QUALIFIER_NAME = "DUPLICATE_PK_ONLY";

static constexpr const char* FROM_OPTION_NAME = "FROM";
static constexpr const char* TO_OPTION_NAME = "TO";
Expand Down
33 changes: 32 additions & 1 deletion src/parser/transform/transform_copy.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#include "common/assert.h"
#include "common/constants.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"
#include "parser/copy.h"
#include "parser/expression/parsed_literal_expression.h"
#include "parser/scan_source.h"
#include "parser/transformer.h"
#include <format>

using namespace lbug::common;

Expand Down Expand Up @@ -95,7 +99,34 @@ options_t Transformer::transformOptions(CypherParser::IC_OptionsContext& ctx) {
// Check if the literal exists, otherwise set the value to true by default
if (loadOption->oC_Literal()) {
// If there is a literal, transform it and use it as the value
options.emplace(optionName, transformLiteral(*loadOption->oC_Literal()));
std::string valueStr = loadOption->oC_Literal()->getText();
StringUtils::toUpper(valueStr);
if (loadOption->iC_OptionQualifier()) {
auto qualifier =
transformSymbolicName(*loadOption->iC_OptionQualifier()->oC_SymbolicName());
auto upperOptionName = optionName;
StringUtils::toUpper(upperOptionName);
StringUtils::toUpper(qualifier);
// Only `IGNORE_ERRORS=true (DUPLICATE_PK_ONLY)` is supported: rewrite it into the
// internal SKIP_DUPLICATE_PK option so the existing duplicate-PK skip path kicks
// in. Any other option/value/qualifier combination is rejected here.
if (upperOptionName != CopyConstants::IGNORE_ERRORS_OPTION_NAME ||
qualifier != CopyConstants::DUPLICATE_PK_ONLY_QUALIFIER_NAME) {
throw common::BinderException(
std::format("Option qualifier ({}) is only supported as "
"IGNORE_ERRORS=true (DUPLICATE_PK_ONLY).",
qualifier));
}
if (valueStr != "TRUE" && valueStr != "1") {
throw common::BinderException(
"IGNORE_ERRORS option qualifier "
"(DUPLICATE_PK_ONLY) is only supported with IGNORE_ERRORS=true.");
}
options.emplace(CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME,
std::make_unique<ParsedLiteralExpression>(Value(true), "true"));
} else {
options.emplace(optionName, transformLiteral(*loadOption->oC_Literal()));
}
} else {
// If no literal is provided, set the default value to true
options.emplace(optionName,
Expand Down
7 changes: 3 additions & 4 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNodeFrom(
const auto copyFromInfo = copyFrom.getInfo();
const auto outFSchema = copyFrom.getSchema();
auto prevOperator = mapOperator(copyFrom.getChild(0).get());
auto fTable =
copyFromInfo->getSkipDuplicatePKOption() ?
FactorizedTableUtils::getNodeCopyResultFTable(MemoryManager::Get(*clientContext)) :
FactorizedTableUtils::getSingleStringColumnFTable(MemoryManager::Get(*clientContext));
// A node COPY always returns the three-column result schema (result,
// skipped_duplicate_pk_count, skipped_duplicate_pks) regardless of the active ignore mode.
auto fTable = FactorizedTableUtils::getNodeCopyResultFTable(MemoryManager::Get(*clientContext));

auto sharedState = std::make_shared<NodeBatchInsertSharedState>(fTable);
sharedState->skipDuplicatePK = copyFromInfo->getSkipDuplicatePKOption();
Expand Down
53 changes: 24 additions & 29 deletions src/processor/operator/persistent/node_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ void NodeBatchInsertSharedState::initPKIndex(const ExecutionContext* context) {
if (nodeTable->tryGetPrimaryKeyIndex() != nullptr) {
if (skipDuplicatePK) {
throw RuntimeException(
"SKIP_DUPLICATE_PK is only supported for node tables with a primary-key "
"hash index.");
"IGNORE_ERRORS=true (DUPLICATE_PK_ONLY) is only supported for node tables "
"with a primary-key hash index.");
}
globalIndexBuilder.reset();
noIndexPKValidator.reset();
Expand All @@ -127,8 +127,8 @@ void NodeBatchInsertSharedState::initPKIndex(const ExecutionContext* context) {
}
if (skipDuplicatePK) {
throw RuntimeException(
"SKIP_DUPLICATE_PK is only supported for node tables with a primary-key hash "
"index.");
"IGNORE_ERRORS=true (DUPLICATE_PK_ONLY) is only supported for node tables with "
"a primary-key hash index.");
}
globalIndexBuilder.reset();
noIndexPKValidator = createNoIndexPKValidator(pkType);
Expand Down Expand Up @@ -455,6 +455,9 @@ void NodeBatchInsert::finalizeInternal(ExecutionContext* context) {
const auto* nodeInfo = info->ptrCast<NodeBatchInsertInfo>();
const auto* nodeSharedState =
dynamic_cast_checked<NodeBatchInsertSharedState*>(sharedState.get());

int64_t skippedDuplicatePKCount = 0;
std::vector<std::string> skippedDuplicatePKs;
if (nodeInfo->skipDuplicatePK) {
std::lock_guard lck{nodeSharedState->duplicatePKSkipResult->mtx};
// Duplicate-PK rows are counted in getNumRows() because they are appended before index
Expand All @@ -465,36 +468,28 @@ void NodeBatchInsert::finalizeInternal(ExecutionContext* context) {
DASSERT(
sharedState->getNumRows() >= sharedState->getNumErroredRows() +
nodeSharedState->duplicatePKSkipResult->skippedCount);
auto outputMsg = std::format("{} tuples have been copied to the {} table.",
sharedState->getNumRows() - sharedState->getNumErroredRows() -
nodeSharedState->duplicatePKSkipResult->skippedCount,
info->tableName);
FactorizedTableUtils::appendNodeCopyResultToTable(sharedState->fTable.get(), outputMsg,
nodeSharedState->duplicatePKSkipResult->skippedCount,
nodeSharedState->duplicatePKSkipResult->pks, MemoryManager::Get(*clientContext));
} else {
auto outputMsg = std::format("{} tuples have been copied to the {} table.",
sharedState->getNumRows() - sharedState->getNumErroredRows(), info->tableName);
FactorizedTableUtils::appendStringToTable(sharedState->fTable.get(), outputMsg,
MemoryManager::Get(*clientContext));
skippedDuplicatePKCount = nodeSharedState->duplicatePKSkipResult->skippedCount;
skippedDuplicatePKs = nodeSharedState->duplicatePKSkipResult->pks;
}

auto copiedCount =
sharedState->getNumRows() - sharedState->getNumErroredRows() - skippedDuplicatePKCount;
const auto warningCount =
WarningContext::Get(*clientContext)->getWarningCount(context->queryID);
std::string outputMsg =
std::format("{} tuples have been copied to the {} table.", copiedCount, info->tableName);
if (warningCount > 0) {
auto warningMsg =
std::format("{} warnings encountered during copy. Use 'CALL "
"show_warnings() RETURN *' to view the actual warnings. Query ID: {}",
warningCount, context->queryID);
if (nodeInfo->skipDuplicatePK) {
FactorizedTableUtils::appendNodeCopyResultToTable(sharedState->fTable.get(), warningMsg,
0 /* skippedDuplicatePKCount */, {} /* skippedDuplicatePKs */,
MemoryManager::Get(*clientContext));
} else {
FactorizedTableUtils::appendStringToTable(sharedState->fTable.get(), warningMsg,
MemoryManager::Get(*clientContext));
}
// Fold the warning summary into the single result row so the user still sees how many
// warnings were collected during the COPY. Individual warnings remain queryable via
// `CALL show_warnings() RETURN *`.
outputMsg = std::format(
"{} tuples have been copied to the {} table. {} warnings encountered during copy. "
"Use 'CALL show_warnings() RETURN *' to view the actual warnings. Query ID: {}",
copiedCount, info->tableName, warningCount, context->queryID);
}
// Contract: a node COPY always returns exactly one row with three columns
// (result, skipped_duplicate_pk_count, skipped_duplicate_pks).
FactorizedTableUtils::appendNodeCopyResultToTable(sharedState->fTable.get(), outputMsg,
skippedDuplicatePKCount, skippedDuplicatePKs, MemoryManager::Get(*clientContext));
}

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,14 +695,18 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
const TableFuncBindInput* input) {
auto scanInput = dynamic_cast_checked<ExtraScanTableFuncBindInput*>(input->extraInput.get());
const auto& options = scanInput->fileScanInfo.options;
// The user-facing COPY options are `IGNORE_ERRORS=true` and `IGNORE_ERRORS=true
// (DUPLICATE_PK_ONLY)`; the parser rewrites the latter into the internal SKIP_DUPLICATE_PK
// option key, so we accept either of those two keys here.
if (options.size() > 2 ||
(options.size() == 1 && !options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME) &&
!options.contains(CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME)) ||
(options.size() == 2 &&
(!options.contains(CopyConstants::IGNORE_ERRORS_OPTION_NAME) ||
!options.contains(CopyConstants::SKIP_DUPLICATE_PK_OPTION_NAME)))) {
throw BinderException{"Copy from Parquet cannot have options other than IGNORE_ERRORS and "
"SKIP_DUPLICATE_PK."};
throw BinderException{
"Copy from Parquet cannot have options other than IGNORE_ERRORS (which "
"may be qualified as IGNORE_ERRORS=true (DUPLICATE_PK_ONLY))."};
}
std::vector<std::string> detectedColumnNames;
std::vector<LogicalType> detectedColumnTypes;
Expand Down
31 changes: 28 additions & 3 deletions test/api/api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,16 @@ TEST_F(ApiTest, SingleQueryHasNextQueryResult) {
TEST_F(ApiTest, CopySkipDuplicatePKPreservesResultColumnNames) {
ASSERT_TRUE(conn->query("CREATE NODE TABLE copy_user(ID STRING, name STRING, PRIMARY KEY(ID));")
->isSuccess());
auto result = conn->query(std::format("COPY copy_user FROM \"{}\" (SKIP_DUPLICATE_PK=true);",
TestHelper::appendLbugRootPath("dataset/copy-fault-tests/duplicate-ids/vOrg.csv")));
auto result = conn->query(
std::format("COPY copy_user FROM \"{}\" (IGNORE_ERRORS=true (DUPLICATE_PK_ONLY));",
TestHelper::appendLbugRootPath("dataset/copy-fault-tests/duplicate-ids/vOrg.csv")));
ASSERT_TRUE(result->isSuccess());

// Contract: a node COPY always returns exactly one row with three columns
// (result, skipped_duplicate_pk_count, skipped_duplicate_pks), regardless of ignore mode.
ASSERT_EQ(result->getNumColumns(), 3);
const auto columnNames = result->getColumnNames();
ASSERT_EQ(columnNames.size(), 3);
EXPECT_EQ(columnNames.size(), 3);
EXPECT_EQ(columnNames[0], "result");
EXPECT_EQ(columnNames[1], "skipped_duplicate_pk_count");
EXPECT_EQ(columnNames[2], "skipped_duplicate_pks");
Expand All @@ -217,6 +221,27 @@ TEST_F(ApiTest, CopySkipDuplicatePKPreservesResultColumnNames) {
auto tuple = result->getNext();
EXPECT_EQ(tuple->getValue(1)->getValue<int64_t>(), 1);
EXPECT_EQ(tuple->getValue(2)->toString(), "[10]");
// Exactly one row is returned.
EXPECT_FALSE(result->hasNext());

// The same 1-row / 3-column contract holds even when a second duplicate-PK csv is copied:
// duplicate PKs accumulate into the skipped lists/errors instead of erroring out.
ASSERT_TRUE(conn->query("CREATE NODE TABLE copy_user2(ID STRING, name STRING, PRIMARY "
"KEY(ID));")
->isSuccess());
auto result2 = conn->query(
std::format("COPY copy_user2 FROM \"{}\" (IGNORE_ERRORS=true (DUPLICATE_PK_ONLY));",
TestHelper::appendLbugRootPath("dataset/copy-fault-tests/duplicate-ids/vOrg.csv")));
ASSERT_TRUE(result2->isSuccess());
ASSERT_EQ(result2->getNumColumns(), 3);
ASSERT_TRUE(result2->hasNext());
auto tuple2 = result2->getNext();
EXPECT_EQ(tuple2->getValue(0)->toString(),
std::format("{} tuples have been copied to the copy_user2 table.", 3));
EXPECT_EQ(tuple2->getValue(1)->getValue<int64_t>(), 1);
EXPECT_EQ(tuple2->getValue(2)->toString(), "[10]");
// Exactly one row is returned.
EXPECT_FALSE(result2->hasNext());
}

TEST_F(ApiTest, Prepare) {
Expand Down
4 changes: 2 additions & 2 deletions test/test_files/copy/copy_node_parquet.test
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
-LOG CopyWithOptionsErrorTest
-STATEMENT COPY tableOfTypes FROM "${LBUG_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/types_50k*.parquet" (HEADER=true);
---- error
Binder exception: Copy from Parquet cannot have options other than IGNORE_ERRORS and SKIP_DUPLICATE_PK.
Binder exception: Copy from Parquet cannot have options other than IGNORE_ERRORS (which may be qualified as IGNORE_ERRORS=true (DUPLICATE_PK_ONLY)).

-CASE IgnoreErrorsTest
-STATEMENT COPY person FROM "${LBUG_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/invalid_pk_col.parquet" (IGNORE_ERRORS=true);
Expand All @@ -75,6 +75,6 @@ Found duplicated primary key value 2, which violates the uniqueness constraint o
5

-CASE SkipDuplicatePKStillErrorsOnNullPKTest
-STATEMENT COPY person FROM "${LBUG_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/invalid_pk_col.parquet" (SKIP_DUPLICATE_PK=true);
-STATEMENT COPY person FROM "${LBUG_ROOT_DIRECTORY}/dataset/copy-test/node/parquet/invalid_pk_col.parquet" (IGNORE_ERRORS=true (DUPLICATE_PK_ONLY));
---- error
Copy exception: Found NULL, which violates the non-null constraint of the primary key column.
2 changes: 1 addition & 1 deletion test/test_files/copy/copy_with_skip_lines.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
---- ok
-STATEMENT COPY person FROM '${LBUG_ROOT_DIRECTORY}/dataset/tinysnb/vPerson.csv' (skip=3);
---- 1
2 tuples have been copied to the person table.
2 tuples have been copied to the person table.|0|[]
-STATEMENT MATCH (p:person) return p.*;
---- 2
5|Dan|2|False|True|20|4.800000|1950-07-23|2031-11-30 12:25:30|10 years 5 months 13:00:00.000024|[1,9]|[Wolfeschlegelstein,Daniel]|[[7,4],[8,8],[9]]|[76,88,99,89]|1.300000|a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a14
Expand Down
2 changes: 1 addition & 1 deletion test/test_files/ddl/ddl_empty.test
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Table art_copy_person has been created.
Index art_copy_person_pk has been created.
-STATEMENT COPY art_copy_person FROM "${LBUG_ROOT_DIRECTORY}/test/test_files/ddl/art_index_person.csv";
---- 1
4 tuples have been copied to the art_copy_person table.
4 tuples have been copied to the art_copy_person table.|0|[]
-STATEMENT MATCH (p:art_copy_person) WHERE p.ID = 2 RETURN p.name;
---- 1
Grace
Expand Down
Loading
Loading