Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 70 additions & 117 deletions src/binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,98 +288,69 @@ static Oid get_corr_postgres_type(const TypeRef & type)
void ch_binary_insert_state_free(void * c)
{
auto * state = (ch_binary_insert_state *)c;
if (state->columns)
if (state->insert_block)
{
/* try to send empty block that sets proper ClickHouse state */
if (!state->success)
/* Finish the insert to set the proper ClickHouse state */
Client * client = (Client *)state->conn->client;
try
{
try
{
Client * client = (Client *)state->conn->client;
client->Insert(state->table_name, Block());
}
catch (const std::exception & e)
{
// just ignore, next query will fail
elog(NOTICE, "pg_clickhouse: could not send empty packet");
}
client->EndInsert();
}

delete (std::vector<clickhouse::ColumnRef> *)state->columns;
catch (const std::exception & e)
{
// just ignore, next query will fail
elog(NOTICE, "pg_clickhouse: could not finish INSERT: - %s", e.what());
}
delete (Block *)state->insert_block;
}
}

void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state)
{
throw std::runtime_error("clickhouse_fdw: XXX ch_binary_prepare_insert not implemented");

// std::vector<clickhouse::ColumnRef> * vec = nullptr;
// Client * client = (Client *)((ch_binary_connection_t *)conn)->client;

// try
// {
// client->PrepareInsert(
// std::string(query) + " VALUES", [&state, &vec](const Block & sample_block) {
// if (sample_block.GetColumnCount() == 0)
// return true;

// vec = new std::vector<clickhouse::ColumnRef>();

// state->len = sample_block.GetColumnCount();

// #if PG_VERSION_NUM < 120000
// state->outdesc = CreateTemplateTupleDesc(state->len, false);
// #else
// state->outdesc = CreateTemplateTupleDesc(state->len);
// #endif

// for (size_t i = 0; i < state->len; i++)
// {
// bool error = false;
// clickhouse::ColumnRef col = sample_block[i];

// auto chtype = col->Type();
// if (chtype->GetCode() == Type::LowCardinality)
// {
// chtype = col->As<ColumnLowCardinality>()->GetNestedType();
// }

// Oid pg_type = get_corr_postgres_type(chtype);

// vec->push_back(clickhouse::CreateColumnByType(col->Type()->GetName()));
// const char * colname = sample_block.GetColumnName(i).c_str();

// /* we can't afford long jumps outside of this function */
// PG_TRY();
// {
// TupleDescInitEntry(
// state->outdesc, (AttrNumber)i + 1, colname, pg_type, -1, 0);
// }
// PG_CATCH();
// {
// error = true;
// }
// PG_END_TRY();

// if (error)
// throw std::runtime_error("could not init tuple descriptor");
// }

// return true;
// });
// }
// catch (const std::exception & e)
// {
// client->ResetConnection();

// if (vec != nullptr)
// delete vec;

// elog(ERROR, "clickhouse_fdw: error while insert preparation - %s", e.what());
// }

// if (vec != nullptr)
// state->columns = (void *)vec;
// Start the INSERT.
Block * block;
Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
try
{
block = new Block(client->BeginInsert(std::string(query) + " VALUES"));
}
catch (const std::exception & e)
{
elog(ERROR, "pg_clickhouse: could not prepare insert - %s", e.what());
}

// Setup the column config (or return if no columns).
state->len = block->GetColumnCount();
if (state->len == 0)
{
delete block;
return;
}
state->outdesc = CreateTemplateTupleDesc(state->len);

// Iterate over the list of columns returned by ClickHouse.
AttrNumber i = 0;
for (Block::Iterator bi(*block); bi.IsValid(); bi.Next())
{
// Determine the Postgres column type.
Oid pg_type = get_corr_postgres_type(bi.Type());
const char * colname = bi.Name().c_str();

PG_TRY();
{
TupleDescInitEntry(state->outdesc, ++i, colname, pg_type, -1, 0);
}
PG_CATCH();
{
// Clean up and re-throw.
client->ResetConnection();
delete block;
PG_RE_THROW();
}
PG_END_TRY();
}

state->insert_block = (ch_insert_block_h *) block;
}

static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, bool isnull)
Expand Down Expand Up @@ -507,14 +478,7 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
col->As<ColumnEnum16>()->Append(s);
break;
case Type::Code::LowCardinality: {
// XXX Figure out proper value to create and pass to
// Append.
throw std::runtime_error(
"clickhouse_fdw: XXX unsupported column type "
+ col->Type()->GetName()
);
// auto item = ItemView{Type::String, std::string_view(s)};
// col->As<ColumnLowCardinality>()->Append(item);
col->AsStrict<ColumnLowCardinalityT<ColumnString>>()->Append(s);
break;
}
default:
Expand Down Expand Up @@ -570,25 +534,19 @@ static void column_append(clickhouse::ColumnRef col, Datum val, Oid valtype, boo
break;
}
case ANYARRAYOID: {
// auto arr = (ch_binary_array_t *)DatumGetPointer(val);

switch (col->Type()->GetCode())
{
case Type::Array: {
// XXX Figure out proper value to create and pass to
// Append.
throw std::runtime_error(
"clickhouse_fdw: XXX unsupported column type "
+ col->Type()->GetName()
auto arrcol = col->AsStrict<ColumnArray>();
auto items = CreateColumnByType(
arrcol->GetType().As<clickhouse::ArrayType>()->GetItemType()->GetName()
);
// auto arrcol = col->As<ColumnArray>();

// arrcol->OffsetsIncrease(arr->len);
// for (size_t i = 0; i < arr->len; i++)
// column_append(
// arrcol->Nested(), arr->datums[i], arr->item_type, arr->nulls[i]);
auto arr = (ch_binary_array_t *)DatumGetPointer(val);
for (size_t i = 0; i < arr->len; i++)
column_append(items, arr->datums[i], arr->item_type, arr->nulls[i]);

// break;
arrcol->AppendAsColumn(items);
break;
}
default:
throw std::runtime_error(
Expand All @@ -610,8 +568,8 @@ void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx)
{
try
{
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
auto col = columns[colidx];
auto block = (Block *)state->insert_block;
auto col = (*block)[colidx];

Datum val = state->values[colidx];
Oid valtype = TupleDescAttr(state->outdesc, colidx)->atttypid;
Expand All @@ -629,16 +587,11 @@ void ch_binary_insert_columns(ch_binary_insert_state * state)
{
try
{
Block block;
auto columns = *(std::vector<clickhouse::ColumnRef> *)state->columns;
for (int i = 0; i < state->outdesc->natts; ++i)
{
Form_pg_attribute att = TupleDescAttr(state->outdesc, i);
block.AppendColumn(NameStr(att->attname), columns[i]);
}

Client * client = (Client *)state->conn->client;
client->Insert(state->table_name, block);
auto block = (Block *)state->insert_block;
block->RefreshRowCount();
client->SendInsertBlock(*block);
block->Clear();
}
catch (const std::exception & e)
{
Expand Down
3 changes: 2 additions & 1 deletion src/include/binary.hh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern "C" {
#endif

typedef struct ch_binary_connection_t ch_binary_connection_t;
typedef struct ch_insert_block_h ch_insert_block_h;
typedef struct ch_binary_response_t
{
void *values;
Expand Down Expand Up @@ -48,7 +49,7 @@ typedef struct {
MemoryContextCallback callback;

TupleDesc outdesc;
void *columns; /* std::vector */
ch_insert_block_h *insert_block; /* clickhouse::Block */
size_t len;
void *conversion_states;
char *table_name;
Expand Down
1 change: 0 additions & 1 deletion src/pglink.c
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ binary_insert_tuple(void *istate, TupleTableSlot *slot)
else
{
ch_binary_insert_columns(state);
state->success = true;
}
}

Expand Down
Loading
Loading