Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/duckdb/extension/core_functions/scalar/date/date_diff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ struct DateDiff {
template <class TA, class TB, class TR>
static inline TR Operation(TA startdate, TB enddate) {
// Weeks do not count Monday crossings, just distance
return (enddate.days - startdate.days) / Interval::DAYS_PER_WEEK;
return (TR(enddate.days) - TR(startdate.days)) / Interval::DAYS_PER_WEEK;
}
};

Expand All @@ -116,7 +116,9 @@ struct DateDiff {
struct MicrosecondsOperator {
template <class TA, class TB, class TR>
static inline TR Operation(TA startdate, TB enddate) {
return Date::EpochMicroseconds(enddate) - Date::EpochMicroseconds(startdate);
const auto start = Date::EpochMicroseconds(startdate);
const auto end = Date::EpochMicroseconds(enddate);
return SubtractOperatorOverflowCheck::Operation<int64_t, int64_t, int64_t>(end, start);
}
};

Expand Down
3 changes: 2 additions & 1 deletion src/duckdb/extension/json/json_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ static const DefaultMacro JSON_MACROS[] = {
"json_group_object",
{"n", "v", nullptr},
{{nullptr, nullptr}},
"CAST('{' || string_agg(to_json(n::VARCHAR) || ':' || CASE WHEN v IS NULL THEN 'null'::JSON ELSE to_json(v) END, "
"CAST('{' || string_agg(CASE WHEN n IS NULL THEN error('json_group_object key cannot be NULL') ELSE "
"to_json(n::VARCHAR) END || ':' || CASE WHEN v IS NULL THEN 'null'::JSON ELSE to_json(v) END, "
"',') || '}' AS JSON)"},
{DEFAULT_SCHEMA,
"json_group_structure",
Expand Down
6 changes: 3 additions & 3 deletions src/duckdb/extension/json/json_functions/json_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ static unique_ptr<FunctionData> ArrayToJSONBind(ClientContext &context, ScalarFu
if (arguments[0]->HasParameter()) {
throw ParameterNotResolvedException();
}
if (arg_id != LogicalTypeId::LIST && arg_id != LogicalTypeId::SQLNULL) {
throw BinderException("array_to_json() argument type must be LIST");
if (arg_id != LogicalTypeId::LIST && arg_id != LogicalTypeId::ARRAY && arg_id != LogicalTypeId::SQLNULL) {
throw BinderException("array_to_json() argument type must be LIST or ARRAY");
}
return JSONCreateBindParams(bound_function, arguments, false);
}
Expand Down Expand Up @@ -259,7 +259,7 @@ static void AddKeyValuePairs(yyjson_mut_doc *doc, yyjson_mut_val *objs[], Vector
for (idx_t i = 0; i < count; i++) {
auto key_idx = key_data.sel->get_index(i);
if (!key_data.validity.RowIsValid(key_idx)) {
continue;
throw InvalidInputException("JSON key cannot be NULL");
}
auto key = CreateJSONValue<string_t, string_t>::Operation(doc, keys[key_idx]);
yyjson_mut_obj_add(objs[i], key, vals[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static LogicalType StructureToTypeObject(yyjson_val *obj, ClientContext &context
yyjson_val *key, *val;
yyjson_obj_foreach(obj, idx, max, key, val) {
val = yyjson_obj_iter_get_val(key);
auto key_str = unsafe_yyjson_get_str(key);
string key_str(unsafe_yyjson_get_str(key), unsafe_yyjson_get_len(key));
if (names.find(key_str) != names.end()) {
JSONCommon::ThrowValFormatError("Duplicate keys in object in JSON structure: %s", val);
}
Expand Down
4 changes: 3 additions & 1 deletion src/duckdb/extension/json/json_multi_file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ bool JSONMultiFileInfo::ParseCopyOption(ClientContext &context, const string &ke
} else {
JSONCheckSingleParameter(key, values);
options.auto_detect = BooleanValue::Get(values.back().DefaultCastAs(LogicalTypeId::BOOLEAN));
options.format = JSONFormat::NEWLINE_DELIMITED;
if (options.format == JSONFormat::AUTO_DETECT) {
options.format = JSONFormat::NEWLINE_DELIMITED;
}
}
return true;
}
Expand Down
17 changes: 11 additions & 6 deletions src/duckdb/extension/json/json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,16 +658,21 @@ bool JSONReader::ParseJSON(JSONReaderScanState &scan_state, char *const json_sta
err.pos = json_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err, "Try auto-detecting the JSON format");
return false;
} else if (!options.ignore_errors && read_size < json_size) {
}
if (read_size < json_size) {
idx_t off = read_size;
idx_t rem = json_size;
SkipWhitespace(json_start, off, rem);
if (off != rem) { // Between end of document and boundary should be whitespace only
err.code = YYJSON_READ_ERROR_UNEXPECTED_CONTENT;
err.msg = "unexpected content after document";
err.pos = read_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err, "Try auto-detecting the JSON format");
return false;
if (!options.ignore_errors) {
err.code = YYJSON_READ_ERROR_UNEXPECTED_CONTENT;
err.msg = "unexpected content after document";
err.pos = read_size;
AddParseError(scan_state, scan_state.lines_or_objects_in_buffer, err,
"Try auto-detecting the JSON format");
return false;
}
doc = nullptr;
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/duckdb/extension/parquet/include/decode_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ class ParquetDecodeUtils {
static void BitUnpack(ByteBuffer &src, bitpacking_width_t &bitpack_pos, T *dst, idx_t count,
const bitpacking_width_t width) {
CheckWidth(width);
if (width > sizeof(T) * BITPACK_DLEN) {
throw IOException("The width (%d) of the bitpacked data exceeds the maximum width (%d) for "
"the target type, the file might be corrupted.",
width, sizeof(T) * BITPACK_DLEN);
}
const auto mask = BITPACK_MASKS[width];
src.available(count * width / BITPACK_DLEN); // check if buffer has enough space available once
if (bitpack_pos == 0 && count >= BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE) {
Expand Down Expand Up @@ -88,6 +93,12 @@ class ParquetDecodeUtils {
template <class T>
static void BitUnpackAlignedInternal(ByteBuffer &src, T *dst, const idx_t count, const bitpacking_width_t width) {
D_ASSERT(count % BitpackingPrimitives::BITPACKING_ALGORITHM_GROUP_SIZE == 0);
if (width > sizeof(T) * BITPACK_DLEN) {
throw IOException("The width (%d) of the bitpacked data exceeds the maximum width (%d) for "
"the target type, the file might be corrupted.",
width, sizeof(T) * BITPACK_DLEN);
}

if (cast_pointer_to_uint64(src.ptr) % sizeof(T) == 0) {
// Fast path: aligned
BitpackingPrimitives::UnPackBuffer<T>(data_ptr_cast(dst), src.ptr, count, width);
Expand Down
6 changes: 5 additions & 1 deletion src/duckdb/extension/parquet/include/parquet_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ struct ParquetReaderScanState {

//! (optional) pointer to the PhysicalOperator for logging
optional_ptr<const PhysicalOperator> op;

//! Number of row groups actually scanned (i.e. not pruned by filters) by this scan state.
//! Accumulates across all files processed by the owning (per-thread) local state.
idx_t row_groups_scanned = 0;
};

struct ParquetColumnDefinition {
Expand Down Expand Up @@ -156,7 +160,7 @@ class ParquetReader : public BaseFileReader {
unique_ptr<ParquetColumnSchema> root_schema;
shared_ptr<EncryptionUtil> encryption_util;
//! How many rows have been read from this file
atomic<idx_t> rows_read;
atomic<idx_t> rows_read {0};

public:
string GetReaderType() const override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ struct VariantMetadata {

public:
VariantMetadataHeader header;
const_data_ptr_t offsets;
const_data_ptr_t bytes;

//! Total byte length of the metadata region.
idx_t total_size = 0;

//! The json object keys have to be null-terminated
//! But we don't receive them null-terminated
Expand Down Expand Up @@ -135,15 +136,18 @@ class VariantBinaryDecoder {
VariantBinaryDecoder() = delete;

public:
static VariantValue Decode(const VariantMetadata &metadata, const_data_ptr_t data);
static VariantValue Decode(const VariantMetadata &metadata, const_data_ptr_t data, idx_t data_offset,
idx_t data_size);

public:
static VariantValue PrimitiveTypeDecode(const VariantValueMetadata &value_metadata, const_data_ptr_t data);
static VariantValue ShortStringDecode(const VariantValueMetadata &value_metadata, const_data_ptr_t data);
static VariantValue PrimitiveTypeDecode(const VariantValueMetadata &value_metadata, const_data_ptr_t data,
idx_t data_offset, idx_t data_size);
static VariantValue ShortStringDecode(const VariantValueMetadata &value_metadata, const_data_ptr_t data,
idx_t data_offset, idx_t data_size);
static VariantValue ObjectDecode(const VariantMetadata &metadata, const VariantValueMetadata &value_metadata,
const_data_ptr_t data);
const_data_ptr_t data, idx_t data_offset, idx_t data_size);
static VariantValue ArrayDecode(const VariantMetadata &metadata, const VariantValueMetadata &value_metadata,
const_data_ptr_t data);
const_data_ptr_t data, idx_t data_offset, idx_t data_size);
};

} // namespace duckdb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "duckdb/common/types/variant_value.hpp"
#include "duckdb/function/scalar_function.hpp"
#include "reader/variant/variant_binary_decoder.hpp"

namespace duckdb {
Expand All @@ -11,12 +12,16 @@ class VariantShreddedConversion {

public:
static vector<VariantValue> Convert(Vector &metadata, Vector &group, idx_t offset, idx_t length, idx_t total_size);
static void ConvertBinaryToVariant(Vector &metadata_and_value, idx_t offset, idx_t length, idx_t total_size,
Vector &result);
static vector<VariantValue> ConvertShreddedLeaf(Vector &metadata, Vector &value, Vector &typed_value, idx_t offset,
idx_t length, idx_t total_size);
static vector<VariantValue> ConvertShreddedArray(Vector &metadata, Vector &value, Vector &typed_value, idx_t offset,
idx_t length, idx_t total_size);
static vector<VariantValue> ConvertShreddedObject(Vector &metadata, Vector &value, Vector &typed_value,
idx_t offset, idx_t length, idx_t total_size);
//! Inverse of GetTransformFunction: decode a binary Variant value (metadata followed by value) into a VARIANT.
static ScalarFunction GetBytesToVariantFunction();
};

} // namespace duckdb
5 changes: 5 additions & 0 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "zstd_file_system.hpp"
#include "writer/primitive_column_writer.hpp"
#include "writer/variant_column_writer.hpp"
#include "reader/variant_column_reader.hpp"

#include <fstream>
#include <iostream>
Expand Down Expand Up @@ -56,6 +57,7 @@
#include "duckdb/logging/log_manager.hpp"
#include "duckdb/main/settings.hpp"
#include "parquet_multi_file_info.hpp"
#include "reader/variant/variant_shredded_conversion.hpp"

namespace duckdb {

Expand Down Expand Up @@ -879,6 +881,9 @@ static void LoadInternal(ExtensionLoader &loader) {
// variant_to_parquet_variant
loader.RegisterFunction(VariantColumnWriter::GetTransformFunction());

// bytes_to_variant
loader.RegisterFunction(VariantShreddedConversion::GetBytesToVariantFunction());

CopyFunction function("parquet");
function.copy_to_select = ParquetWriteSelect;
function.copy_to_bind = ParquetWriteBind;
Expand Down
26 changes: 25 additions & 1 deletion src/duckdb/extension/parquet/parquet_multi_file_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ struct ParquetReadBindData : public TableFunctionData {

struct ParquetReadGlobalState : public GlobalTableFunctionState {
explicit ParquetReadGlobalState(optional_ptr<const PhysicalOperator> op_p)
: row_group_index(0), batch_index(0), op(op_p) {
: row_group_index(0), batch_index(0), total_row_groups_to_scan(0), op(op_p) {
}
//! Index of row group within file currently up for scanning
idx_t row_group_index;
//! Batch index of the next row group to be scanned
idx_t batch_index;
//! Total number of row groups dispatched for scanning across all files.
//! Updated under the MultiFileGlobalState lock as row groups are handed out.
idx_t total_row_groups_to_scan;
//! (Optional) pointer to physical operator performing the scan
optional_ptr<const PhysicalOperator> op;
};
Expand Down Expand Up @@ -366,6 +369,24 @@ static vector<PartitionStatistics> ParquetGetPartitionStats(ClientContext &conte
return result;
}

static void ParquetGetMetrics(ClientContext &, const FunctionData *, GlobalTableFunctionState &global_state_p,
LocalTableFunctionState &local_state_p, const profiler_settings_t &requested_metrics,
profiler_metrics_t &metrics) {
auto &mf_gstate = global_state_p.Cast<MultiFileGlobalState>();
auto &mf_lstate = local_state_p.Cast<MultiFileLocalState>();
auto &gstate = mf_gstate.global_state->Cast<ParquetReadGlobalState>();
auto &lstate = mf_lstate.local_state->Cast<ParquetReadLocalState>();

if (requested_metrics.find(MetricType::OPERATOR_ROW_GROUPS_SCANNED) != requested_metrics.end()) {
// per-thread count of row groups actually read; summed across threads by the profiler
metrics[MetricType::OPERATOR_ROW_GROUPS_SCANNED] = Value::UBIGINT(lstate.scan_state.row_groups_scanned);
}
if (requested_metrics.find(MetricType::OPERATOR_TOTAL_ROW_GROUPS_TO_SCAN) != requested_metrics.end()) {
// shared total across all files; reported identically by every thread
metrics[MetricType::OPERATOR_TOTAL_ROW_GROUPS_TO_SCAN] = Value::UBIGINT(gstate.total_row_groups_to_scan);
}
}

TableFunctionSet ParquetScanFunction::GetFunctionSet() {
MultiFileFunction<ParquetMultiFileInfo> table_function("parquet_scan");
table_function.named_parameters["binary_as_string"] = LogicalType::BOOLEAN;
Expand All @@ -383,6 +404,7 @@ TableFunctionSet ParquetScanFunction::GetFunctionSet() {
table_function.get_row_id_columns = ParquetGetRowIdColumns;
table_function.pushdown_expression = ParquetScanPushdownExpression;
table_function.get_partition_stats = ParquetGetPartitionStats;
table_function.get_metrics = ParquetGetMetrics;
table_function.filter_pushdown = true;
table_function.filter_prune = true;
table_function.late_materialization = true;
Expand Down Expand Up @@ -687,6 +709,8 @@ bool ParquetReader::TryInitializeScan(ClientContext &context, GlobalTableFunctio
// The current reader has rowgroups left to be scanned
lstate.group_indexes = {gstate.row_group_index};
gstate.row_group_index++;
// Count this row group towards the total to be scanned (called under the MultiFileGlobalState lock)
gstate.total_row_groups_to_scan++;
return true;
}

Expand Down
23 changes: 17 additions & 6 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,14 @@ MultiFileColumnDefinition ParquetReader::ParseColumnDefinition(const FileMetaDat
result.identifier = Value::INTEGER(parent_column_schema.field_id);
}
}
for (auto &child : element.children) {
result.children.push_back(ParseColumnDefinition(file_meta_data, child));
// A GEOMETRY column is a leaf at the logical level - it only wraps an inner BLOB child internally so that the
// reader can validate/transform the WKB. Exposing that child here would make the column definition diverge from
// the (childless) global GEOMETRY column, breaking trivial column mapping and disabling row group pruning for
// spatial predicates. Treat it as a leaf.
if (element.schema_type != ParquetColumnSchemaType::GEOMETRY) {
for (auto &child : element.children) {
result.children.push_back(ParseColumnDefinition(file_meta_data, child));
}
}
return result;
}
Expand Down Expand Up @@ -894,11 +900,11 @@ ParquetReader::ParquetReader(ClientContext &context_p, OpenFileInfo file_p, Parq
metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
encryption_util, footer_size);
} else {
metadata = ObjectCache::GetObjectCache(context_p).Get<ParquetFileMetadataCache>(file.path);
metadata = ObjectCache::GetObjectCache(context_p).GetWithTypePrefix<ParquetFileMetadataCache>(file.path);
if (!metadata || !metadata->IsValid(*file_handle)) {
metadata = LoadMetadata(context_p, allocator, *file_handle, parquet_options.encryption_config,
encryption_util, footer_size);
ObjectCache::GetObjectCache(context_p).Put(file.path, metadata);
ObjectCache::GetObjectCache(context_p).PutWithTypePrefix<ParquetFileMetadataCache>(file.path, metadata);
}
}
} else {
Expand All @@ -915,7 +921,7 @@ bool ParquetReader::MetadataCacheEnabled(ClientContext &context) {

shared_ptr<ParquetFileMetadataCache> ParquetReader::GetMetadataCacheEntry(ClientContext &context,
const OpenFileInfo &file) {
return ObjectCache::GetObjectCache(context).Get<ParquetFileMetadataCache>(file.path);
return ObjectCache::GetObjectCache(context).GetWithTypePrefix<ParquetFileMetadataCache>(file.path);
}

ParquetUnionData::~ParquetUnionData() {
Expand Down Expand Up @@ -1421,9 +1427,14 @@ AsyncResult ParquetReader::Scan(ClientContext &context, ParquetReaderScanState &
}

auto &group = GetGroup(state);
const bool row_group_pruned = state.offset_in_group == (idx_t)group.num_rows;
if (!row_group_pruned) {
// the row group survived filter pruning and will actually be read
state.row_groups_scanned++;
}
if (state.op) {
DUCKDB_LOG(context, PhysicalOperatorLogType, *state.op, "ParquetReader",
state.offset_in_group == (idx_t)group.num_rows ? "SkipRowGroup" : "ReadRowGroup",
row_group_pruned ? "SkipRowGroup" : "ReadRowGroup",
{{"file", file.path}, {"row_group_id", to_string(state.group_idx_list[state.current_group])}});
}

Expand Down
Loading
Loading