feat(cbo): add ANALYZE TABLE persistent column statistics#630
feat(cbo): add ANALYZE TABLE persistent column statistics#630LuciferYang wants to merge 1 commit into
Conversation
|
ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. |
7cc85c8 to
1f1e46f
Compare
| // after the column became all-NULL), so the reader can't serve a stale value under the fresh | ||
| // manifest version. (Columns not analyzed at all are cleared by staleKeys.) | ||
| // | ||
| // The histogram is intentionally dropped (when spark.sql.statistics.histogram.enabled is on, |
There was a problem hiding this comment.
Maybe it's better to skip histogram computing ahead, rather than skipping persist the histogram result
There was a problem hiding this comment.
Makes sense, building it just to throw it away is wasted work. LanceNativeColumnStats now turns spark.sql.statistics.histogram.enabled off around the aggregation (and restores it after), so Spark never computes a histogram in the first place. Dropped the post-serialization strip too. The test that sets histogram.enabled=true still passes, now because nothing gets produced rather than produced-then-dropped.
| } | ||
|
|
||
| // Step 4: complete-marker LAST — only valid stats payloads have lance.stats.complete=true. | ||
| changes += TableChange.setProperty(LanceStatsKeys.COMPLETE, "true") |
There was a problem hiding this comment.
Lance table atomically alterTable properties, valid stats is over-design
There was a problem hiding this comment.
Agreed. The whole change set goes in via a single updateConfig, so there's no half-written window for the marker to guard. I removed lance.stats.complete end to end: the two writes, the read-side gate, and the constant. Validity now keys off version + computedAtVersion + schemaHash, which already covered everything the flag did.
| */ | ||
| public static final String COLUMN_PREFIX = "lance.stats.column."; | ||
|
|
||
| public static final String COLUMN_SUFFIX_VERSION = ".version"; |
There was a problem hiding this comment.
It seems all the suffix variant only used in test cases
There was a problem hiding this comment.
Yeah. The read and write paths both go through Spark's CatalogColumnStat.toMap/fromMap, so prod never references the suffixes directly. Moved them into a test-only LanceStatsTestKeys and left LanceStatsKeys with just the keys production actually reads or writes.
…'s own infra Add `ANALYZE TABLE <t> COMPUTE STATISTICS [FOR [ALL] COLUMNS]` for Lance tables. Per-column stats (min/max/nullCount/HLL-approx distinctCount/avgLen/maxLen) are persisted into the Lance manifest TBLPROPERTIES under `lance.stats.*`, tagged with the manifest version, and surfaced to Spark's CBO on subsequent scans via DSv2 `Statistics.columnStats()` — no per-scan re-aggregation. Reuses Spark's own infrastructure end-to-end: - Computation: Spark's `CommandUtils.computeColumnStats` (single aggregation job), via a reflective bridge (LanceNativeColumnStats) because the session parameter is `o.a.s.sql.SparkSession` on 3.x but `o.a.s.sql.classic.SparkSession` on 4.x; the lookup fails loud if Spark's internal signature drifts. - Wire format: Spark's `CatalogColumnStat.toMap` / `fromMap` / `toPlanStat` (LanceColumnStatCodec on the read side). min/max come back in the internal catalyst representation that `DataSourceV2Relation.transformV2Stats` copies verbatim into a catalyst ColumnStat (no CatalystTypeConverters round-trip). NDV is HLL-approximate; string/binary get no min/max (matching native ANALYZE); avgLen/maxLen are surfaced for join-size estimation. Parsing: no custom ANALYZE grammar. Spark parses ANALYZE natively into AnalyzeColumn / AnalyzeTable; an injected resolution rule (LanceAnalyzeTableResolution) rewrites those into LanceAnalyzeTable when — and only when — the target resolves to a LanceDataset, pre-empting CheckAnalysis's NOT_SUPPORTED_COMMAND_FOR_V2_TABLE rejection (which fires after the resolution batches). Non-Lance ANALYZE is left untouched for Spark, so ANALYZE on a non-Lance table in a Lance-extension session is no longer hijacked. NOSCAN and partition-scoped ANALYZE intentionally fall through to Spark's V2 rejection. Safety / correctness: - Atomicity: the entire change list commits via one `catalog.alterTable` → single `Dataset.updateConfig`; the read path consumes stats only when `lance.stats.complete=true`. - Staleness: stats are tagged with the expected post-write manifest version; the read path's exact-version check rejects stale stats (fall back to no stats), with an opt-in allow-stale config. - Schema drift: SHA-256 fingerprint of (name, dataType, nullable) guards against rename/retype/reorder; mismatch falls back to no stats. - TimestampNTZ columns are skipped (their min/max would crash Spark's CBO FilterEstimation); poisoned/unparseable per-column payloads fail-safe to no stats for that column. Config (SparkConf / per-scan): `spark.lance.cbo.column.stats.enabled`, `.max.columns`, `.allow.stale`. Master switch off = safe rollback. Verified: lint + checkstyle clean; full Spark 3.5 suite (866 tests) and full Spark 4.0 suite (888 tests) green; Spark 3.4 / 4.1 compile.
1f1e46f to
47ab288
Compare
|
Hi, @LuciferYang There some discussions about column statistics in lance format . See lance-format/lance#4540. |
|
Thanks @fangbo, this is the context I was missing. Agreed that column statistics really belong in the format itself. Per-fragment collection consolidated into a mergeable, eventually default-on store is a much better long-term home than recomputing in Spark and parking the result in TBLPROPERTIES. Since @HaochengLIU's native work (lance #5639, #5967) is already underway and covers the same ground, I'd rather not ship an interim Spark-side mechanism that we'd only have to migrate off later. So I'll put this PR on hold and wait for the native column-statistics API to land, then wire lance-spark's CBO up to Happy to help review/test on the native side in the meantime, and to align on the fields Spark's CBO needs (min/max/nullCount/distinctCount/avgLen/maxLen) so they're covered when it lands. |
Summary
Adds
ANALYZE TABLE … COMPUTE STATISTICS [FOR ALL COLUMNS | FOR COLUMNS …]for Lance tables, persisting per-column statistics into the Lance manifest so Spark's CBO can consume them on subsequent scans — O(1) per column, with no per-scan re-aggregation.The implementation reuses Spark's own statistics infrastructure rather than hand-rolling aggregation or a serialization format, and adds no new SQL grammar (Spark already parses
ANALYZE; an injected resolution rule rewrites it for Lance tables).Full design, wire format, correctness guarantees, configuration, and known limitations are in #629 — this PR implements that design.
Closes #629
Fixes part of the CBO roadmap (persistent column statistics).
What's in here
ANALYZEdelegates toCommandUtils.computeColumnStats(one aggregation job: min / max / nullCount / HLL-approx distinctCount / avgLen / maxLen) and persists each column with Spark'sCatalogColumnStat.toMap. The read path decodes viaCatalogColumnStat.fromMap/toPlanStatand feeds the CBO directly.LanceAnalyzeTableResolutionrewrites Spark's nativeAnalyzeColumn/AnalyzeTable→LanceAnalyzeTablewhen the target is a Lance dataset, pre-empting the V2-table rejection thrown inDataSourceV2Strategy. Non-LanceANALYZEis untouched.lance.stats.version=1): an envelope pluslance.stats.column.<name>.<suffix>per-column keys in Spark'sCatalogColumnStatmap form, in manifestTBLPROPERTIES.complete=true; SHA-256 schema-drift guard;computedAtVersionexact-equality invalidation; areadVersion+1read-side staleness guard. Fail-safe everywhere (any decode problem → live aggregation).spark.lance.cbo.column.stats.enabled(master kill-switch, defaulttrue) andspark.lance.cbo.column.stats.allow.stale(defaultfalse).FOR ALL COLUMNS/ rejected underFOR COLUMNS.Key classes
read/LanceStatsKeys.java— wire-format spec, key constants, schema hash.v2/LanceNativeColumnStats.scala— reflective bridge toCommandUtils.computeColumnStats(3.x/4.x split).v2/LanceColumnStatCodec.scala— read-sideCatalogColumnStat→ V2ColumnStatisticsbridge (fail-safe).v2/LanceAnalyzeTableResolution.scala— analyzer rule rewriting native ANALYZE for Lance tables.v2/LanceAnalyzeTableExec.scala— ANALYZE executor (writer, single-commit ordering, orphan-key GC).read/LanceScanBuilder.java—loadPersistedColumnStatsfast path.Test plan
LanceStatsKeysTest,LanceAnalyzeTableSchemaHashTest,LoadPersistedColumnStatsTest,LanceSparkReadOptionsSerializationTest.BaseAnalyzeTableTest(run per Spark version viaAnalyzeTableTest) —FOR ALL COLUMNS/FOR COLUMNS, decimal / empty / all-null / re-analyze / partial, stats-reach-scan, TimestampNTZ-skip, case-insensitivity, NOSCAN-falls-through-to-Spark, and kill-switch paths.AnalyzeTableTest). Spark 4.0 / 4.1 reuse the 3.5 test sources viabuild-helper; 3.4 compiles. The full four-version + integration matrix runs in CI.