From 5051df624349b3b4ebc47a9d31da3b4db9048d38 Mon Sep 17 00:00:00 2001 From: zer0stars <74260741+zer0stars@users.noreply.github.com> Date: Fri, 20 Mar 2026 11:00:00 -0600 Subject: [PATCH] Fix empty results for sub-second timestamp queries The clickhouse-go driver formats positional time.Time bind params as toDateTime() which truncates to second precision. This caused a mismatch with DateTime64(6) columns and the toStartOfInterval origin (which used fromUnixTimestamp64Micro with full precision), resulting in zero rows returned when callers passed timestamps with milliseconds. Replace all timestamp ? bind params with inline fromUnixTimestamp64Micro() literals to preserve microsecond precision end-to-end. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/service/ch/ch.go | 4 +- internal/service/ch/ignition_detector.go | 14 ++--- internal/service/ch/queries.go | 71 ++++++++++++++---------- internal/service/ch/segments_utils.go | 14 ++--- 4 files changed, 59 insertions(+), 44 deletions(-) diff --git a/internal/service/ch/ch.go b/internal/service/ch/ch.go index f0fd4d5..1bc5639 100644 --- a/internal/service/ch/ch.go +++ b/internal/service/ch/ch.go @@ -319,8 +319,8 @@ func (s *Service) GetEvents(ctx context.Context, subject string, from, to time.T qm.Select(vss.EventNameCol, vss.EventSourceCol, vss.EventTimestampCol, vss.EventDurationNsCol, vss.EventMetadataCol, vss.EventTagsCol), qm.From(vss.EventTableName), qm.Where(eventSubjectWhere, subject), - qm.Where(vss.EventTimestampCol+" >= ?", from), - qm.Where(vss.EventTimestampCol+" < ?", to), + qm.Where(vss.EventTimestampCol + " >= " + dateTime64Micro(from)), + qm.Where(vss.EventTimestampCol + " < " + dateTime64Micro(to)), qm.OrderBy(vss.EventTimestampCol + " DESC"), } mods = appendEventFilterMods(mods, filter) diff --git a/internal/service/ch/ignition_detector.go b/internal/service/ch/ignition_detector.go index 025f865..04a7371 100644 --- a/internal/service/ch/ignition_detector.go +++ b/internal/service/ch/ignition_detector.go @@ -85,14 +85,14 @@ func (d *IgnitionDetector) getStateChangesQueryWithLookback(subject string, from // - All state changes in range [from, to) // // PREWHERE on subject filters before FINAL merge, significantly reducing work - query := ` + query := fmt.Sprintf(` SELECT timestamp, new_state, prev_state FROM ( SELECT timestamp, new_state, prev_state FROM signal_state_changes FINAL PREWHERE subject = ? WHERE signal_name = 'isIgnitionOn' - AND timestamp >= ? - AND timestamp < ? + AND timestamp >= %s + AND timestamp < %s AND prev_state != new_state ORDER BY timestamp DESC LIMIT 1 @@ -104,13 +104,13 @@ SELECT timestamp, new_state, prev_state FROM ( FROM signal_state_changes FINAL PREWHERE subject = ? WHERE signal_name = 'isIgnitionOn' - AND timestamp >= ? - AND timestamp < ? + AND timestamp >= %s + AND timestamp < %s AND prev_state != new_state ) -ORDER BY timestamp` +ORDER BY timestamp`, dateTime64Micro(lookbackLimit), dateTime64Micro(from), dateTime64Micro(from), dateTime64Micro(to)) - args := []any{subject, lookbackLimit, from, subject, from, to} + args := []any{subject, subject} return query, args } diff --git a/internal/service/ch/queries.go b/internal/service/ch/queries.go index 1500293..077e1b6 100644 --- a/internal/service/ch/queries.go +++ b/internal/service/ch/queries.go @@ -25,8 +25,6 @@ const ( subjectWhere = vss.SubjectCol + " = ?" eventSubjectWhere = vss.EventSubjectCol + " = ?" nameIn = vss.NameCol + " IN ?" - timestampFrom = vss.TimestampCol + " >= ?" - timestampTo = vss.TimestampCol + " < ?" sourceWhere = vss.SourceCol + " = ?" groupAsc = IntervalGroup + " ASC" signalTypeCol = "signal_type" @@ -117,6 +115,24 @@ func newQuery(mods ...qm.QueryMod) (string, []any) { return queries.BuildQuery(q) } +// dateTime64Micro formats a time.Time as a ClickHouse DateTime64(6) literal. +// This bypasses the clickhouse-go driver's positional bind formatting which +// truncates time.Time to second-precision toDateTime(), losing sub-second data +// and causing empty results when the column is DateTime64(6, 'UTC'). +func dateTime64Micro(t time.Time) string { + return fmt.Sprintf("fromUnixTimestamp64Micro(%d)", t.UnixMicro()) +} + +// whereTimestampFrom returns a WHERE clause for timestamp >= t with microsecond precision. +func whereTimestampFrom(t time.Time) qm.QueryMod { + return qm.Where(vss.TimestampCol + " >= " + dateTime64Micro(t)) +} + +// whereTimestampTo returns a WHERE clause for timestamp < t with microsecond precision. +func whereTimestampTo(t time.Time) qm.QueryMod { + return qm.Where(vss.TimestampCol + " < " + dateTime64Micro(t)) +} + func withSource(source string) qm.QueryMod { did, err := cloudevent.DecodeEthrDID(source) if err == nil { @@ -510,8 +526,8 @@ func getAggQuery(subject string, aggArgs *model.AggregatedSignalArgs) (string, [ selectStringAggs(aggArgs.StringArgs), selectLocationAggs(aggArgs.LocationArgs), qm.Where(subjectWhere, subject), - qm.Where(timestampFrom, aggArgs.FromTS), - qm.Where(timestampTo, aggArgs.ToTS), + whereTimestampFrom(aggArgs.FromTS), + whereTimestampTo(aggArgs.ToTS), qm.From(vss.TableName), qm.InnerJoin(valueTable), qm.GroupBy(IntervalGroup), @@ -537,13 +553,11 @@ func getBatchAggQuery(subject string, ranges []TimeRange, globalFrom, globalTo t return "", nil, errors.New("no aggregations for batch agg") } valueTable := buildBatchAggValueTable(floatArgs, locationArgs) - multiIf := buildSegmentIndexMultiIf(vss.TimestampCol, len(ranges)) - args := make([]any, 0, 2*len(ranges)+3) - for _, r := range ranges { - args = append(args, r.From, r.To) - } - args = append(args, subject, globalFrom, globalTo) - inner := buildBatchAggInner(valueTable, multiIf) + multiIf := buildSegmentIndexMultiIf(vss.TimestampCol, ranges) + // Only subject remains as a bind param; all timestamps are inlined + // as DateTime64(6) literals to preserve sub-second precision. + args := []any{subject} + inner := buildBatchAggInner(valueTable, multiIf, globalFrom, globalTo) outer := buildBatchAggOuter(inner, floatArgs, locationArgs) return outer, args, nil } @@ -559,10 +573,12 @@ func buildBatchAggValueTable(floatArgs []model.FloatSignalArgs, locationArgs []m return fmt.Sprintf("VALUES('%s', %s) as %s ON %s.%s = %s.%s", valueTableDef, strings.Join(valuesArgs, ", "), aggTableName, vss.TableName, vss.NameCol, aggTableName, vss.NameCol) } -func buildBatchAggInner(valueTable, multiIf string) string { +func buildBatchAggInner(valueTable, multiIf string, globalFrom, globalTo time.Time) string { selectList := multiIf + ", " + signalTypeCol + ", " + signalIndexCol + ", " + vss.TimestampCol + ", " + vss.ValueNumberCol + ", " + vss.ValueStringCol + ", " + vss.ValueLocationCol return "SELECT " + selectList + " FROM " + vss.TableName + " INNER JOIN " + valueTable + - " WHERE " + subjectWhere + " AND " + vss.TimestampCol + " >= ? AND " + vss.TimestampCol + " < ?" + " WHERE " + subjectWhere + + " AND " + vss.TimestampCol + " >= " + dateTime64Micro(globalFrom) + + " AND " + vss.TimestampCol + " < " + dateTime64Micro(globalTo) } func buildBatchAggOuter(inner string, floatArgs []model.FloatSignalArgs, locationArgs []model.LocationSignalArgs) string { @@ -678,14 +694,16 @@ func aggTableEntry(ft FieldType, index int, name string) string { return fmt.Sprintf("(%d, %d, '%s')", ft, index, escaped) } -// buildSegmentIndexMultiIf returns "multiIf( (tsCol >= ? AND tsCol < ?), 0, ..., -1) AS seg_idx" for n ranges. -func buildSegmentIndexMultiIf(timestampCol string, nRanges int) string { - if nRanges == 0 { +// buildSegmentIndexMultiIf returns "multiIf( (tsCol >= dt64 AND tsCol < dt64), 0, ..., -1) AS seg_idx" +// for the given time ranges. Timestamps are formatted inline as DateTime64(6) literals to +// preserve sub-second precision (the clickhouse-go driver truncates positional ? to seconds). +func buildSegmentIndexMultiIf(timestampCol string, ranges []TimeRange) string { + if len(ranges) == 0 { return "toInt32(-1) AS seg_idx" } - cond := "(" + timestampCol + " >= ? AND " + timestampCol + " < ?)" - parts := make([]string, 0, nRanges) - for i := 0; i < nRanges; i++ { + parts := make([]string, 0, len(ranges)) + for i, r := range ranges { + cond := fmt.Sprintf("(%s >= %s AND %s < %s)", timestampCol, dateTime64Micro(r.From), timestampCol, dateTime64Micro(r.To)) parts = append(parts, cond+", "+fmt.Sprintf("%d", i)) } return "multiIf(" + strings.Join(parts, ", ") + ", -1) AS seg_idx" @@ -771,8 +789,8 @@ func getEventCountsQuery(subject string, from, to time.Time, eventNames []string qm.Select("count(*) AS count"), qm.From(vss.EventTableName), qm.Where(eventSubjectWhere, subject), - qm.Where(vss.EventTimestampCol+" >= ?", from), - qm.Where(vss.EventTimestampCol+" < ?", to), + qm.Where(vss.EventTimestampCol + " >= " + dateTime64Micro(from)), + qm.Where(vss.EventTimestampCol + " < " + dateTime64Micro(to)), qm.GroupBy(vss.EventNameCol), } if len(eventNames) > 0 { @@ -793,8 +811,8 @@ func getEventCountsForRangesQuery(subject string, ranges []TimeRange, eventNames if len(ranges) == 0 { return eventCountsForRangesEmptyQuery(), nil } - multiIf := buildSegmentIndexMultiIf(vss.EventTimestampCol, len(ranges)) - args := buildEventCountsForRangesArgs(ranges, subject, eventNames) + multiIf := buildSegmentIndexMultiIf(vss.EventTimestampCol, ranges) + args := buildEventCountsForRangesArgs(subject, eventNames) innerSelect := buildEventCountsForRangesInner(multiIf, eventNames) stmt := "SELECT seg_idx, name, count(*) AS count FROM (" + innerSelect + ") WHERE seg_idx >= 0 GROUP BY seg_idx, name" return stmt, args @@ -804,11 +822,8 @@ func eventCountsForRangesEmptyQuery() string { return "SELECT toInt32(-1) AS seg_idx, '' AS name, toUInt64(0) AS count FROM " + vss.EventTableName + " WHERE 0" } -func buildEventCountsForRangesArgs(ranges []TimeRange, subject string, eventNames []string) []any { - args := make([]any, 0, 2*len(ranges)+1+len(eventNames)) - for _, r := range ranges { - args = append(args, r.From, r.To) - } +func buildEventCountsForRangesArgs(subject string, eventNames []string) []any { + args := make([]any, 0, 1+len(eventNames)) args = append(args, subject) for _, n := range eventNames { args = append(args, n) diff --git a/internal/service/ch/segments_utils.go b/internal/service/ch/segments_utils.go index 73e5b54..1c35a3e 100644 --- a/internal/service/ch/segments_utils.go +++ b/internal/service/ch/segments_utils.go @@ -36,7 +36,7 @@ func getWindowedSignalCounts( signalThreshold int, distinctSignalThreshold int, ) (_ []ActiveWindow, retErr error) { - query := ` + query := fmt.Sprintf(` SELECT toStartOfInterval(timestamp, INTERVAL ? second) AS window_start, toStartOfInterval(timestamp, INTERVAL ? second) + INTERVAL ? second AS window_end, @@ -44,13 +44,13 @@ SELECT uniq(name) AS distinct_signal_count FROM signal FINAL PREWHERE subject = ? -WHERE timestamp >= ? - AND timestamp < ? +WHERE timestamp >= %s + AND timestamp < %s GROUP BY window_start HAVING signal_count >= ? AND distinct_signal_count >= ? -ORDER BY window_start` +ORDER BY window_start`, dateTime64Micro(from), dateTime64Micro(to)) - rows, err := conn.Query(ctx, query, windowSizeSeconds, windowSizeSeconds, windowSizeSeconds, subject, from, to, signalThreshold, distinctSignalThreshold) + rows, err := conn.Query(ctx, query, windowSizeSeconds, windowSizeSeconds, windowSizeSeconds, subject, signalThreshold, distinctSignalThreshold) if err != nil { return nil, fmt.Errorf("failed to query windowed signal counts: %w", err) } @@ -127,9 +127,9 @@ func getLevelSamples(ctx context.Context, conn clickhouse.Conn, subject string, query := "SELECT " + vss.TimestampCol + ", " + vss.ValueNumberCol + " FROM " + vss.TableName + " FINAL" + " PREWHERE " + vss.SubjectCol + " = ?" + - " WHERE " + vss.NameCol + " = ? AND " + vss.TimestampCol + " >= ? AND " + vss.TimestampCol + " < ?" + + " WHERE " + vss.NameCol + " = ? AND " + vss.TimestampCol + " >= " + dateTime64Micro(from) + " AND " + vss.TimestampCol + " < " + dateTime64Micro(to) + " ORDER BY " + vss.TimestampCol - rows, err := conn.Query(ctx, query, subject, name, from, to) + rows, err := conn.Query(ctx, query, subject, name) if err != nil { return nil, err }