Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/service/ch/ch.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ func (s *Service) GetEvents(ctx context.Context, subject string, from, to time.T
qm.Select(vss.EventNameCol, vss.EventSourceCol, vss.EventTimestampCol, vss.EventDurationNsCol, vss.EventMetadataCol, vss.EventTagsCol),
qm.From(vss.EventTableName),
qm.Where(eventSubjectWhere, subject),
qm.Where(vss.EventTimestampCol+" >= ?", from),
qm.Where(vss.EventTimestampCol+" < ?", to),
qm.Where(vss.EventTimestampCol + " >= " + dateTime64Micro(from)),
qm.Where(vss.EventTimestampCol + " < " + dateTime64Micro(to)),
qm.OrderBy(vss.EventTimestampCol + " DESC"),
}
mods = appendEventFilterMods(mods, filter)
Expand Down
14 changes: 7 additions & 7 deletions internal/service/ch/ignition_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ func (d *IgnitionDetector) getStateChangesQueryWithLookback(subject string, from
// - All state changes in range [from, to)
//
// PREWHERE on subject filters before FINAL merge, significantly reducing work
query := `
query := fmt.Sprintf(`
SELECT timestamp, new_state, prev_state FROM (
SELECT timestamp, new_state, prev_state
FROM signal_state_changes FINAL
PREWHERE subject = ?
WHERE signal_name = 'isIgnitionOn'
AND timestamp >= ?
AND timestamp < ?
AND timestamp >= %s
AND timestamp < %s
AND prev_state != new_state
ORDER BY timestamp DESC
LIMIT 1
Expand All @@ -104,13 +104,13 @@ SELECT timestamp, new_state, prev_state FROM (
FROM signal_state_changes FINAL
PREWHERE subject = ?
WHERE signal_name = 'isIgnitionOn'
AND timestamp >= ?
AND timestamp < ?
AND timestamp >= %s
AND timestamp < %s
AND prev_state != new_state
)
ORDER BY timestamp`
ORDER BY timestamp`, dateTime64Micro(lookbackLimit), dateTime64Micro(from), dateTime64Micro(from), dateTime64Micro(to))

args := []any{subject, lookbackLimit, from, subject, from, to}
args := []any{subject, subject}

return query, args
}
Expand Down
71 changes: 43 additions & 28 deletions internal/service/ch/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
subjectWhere = vss.SubjectCol + " = ?"
eventSubjectWhere = vss.EventSubjectCol + " = ?"
nameIn = vss.NameCol + " IN ?"
timestampFrom = vss.TimestampCol + " >= ?"
timestampTo = vss.TimestampCol + " < ?"
sourceWhere = vss.SourceCol + " = ?"
groupAsc = IntervalGroup + " ASC"
signalTypeCol = "signal_type"
Expand Down Expand Up @@ -117,6 +115,24 @@ func newQuery(mods ...qm.QueryMod) (string, []any) {
return queries.BuildQuery(q)
}

// dateTime64Micro formats a time.Time as a ClickHouse DateTime64(6) literal.
// This bypasses the clickhouse-go driver's positional bind formatting which
// truncates time.Time to second-precision toDateTime(), losing sub-second data
// and causing empty results when the column is DateTime64(6, 'UTC').
func dateTime64Micro(t time.Time) string {
return fmt.Sprintf("fromUnixTimestamp64Micro(%d)", t.UnixMicro())
}

// whereTimestampFrom returns a WHERE clause for timestamp >= t with microsecond precision.
func whereTimestampFrom(t time.Time) qm.QueryMod {
return qm.Where(vss.TimestampCol + " >= " + dateTime64Micro(t))
}

// whereTimestampTo returns a WHERE clause for timestamp < t with microsecond precision.
func whereTimestampTo(t time.Time) qm.QueryMod {
return qm.Where(vss.TimestampCol + " < " + dateTime64Micro(t))
}

func withSource(source string) qm.QueryMod {
did, err := cloudevent.DecodeEthrDID(source)
if err == nil {
Expand Down Expand Up @@ -510,8 +526,8 @@ func getAggQuery(subject string, aggArgs *model.AggregatedSignalArgs) (string, [
selectStringAggs(aggArgs.StringArgs),
selectLocationAggs(aggArgs.LocationArgs),
qm.Where(subjectWhere, subject),
qm.Where(timestampFrom, aggArgs.FromTS),
qm.Where(timestampTo, aggArgs.ToTS),
whereTimestampFrom(aggArgs.FromTS),
whereTimestampTo(aggArgs.ToTS),
qm.From(vss.TableName),
qm.InnerJoin(valueTable),
qm.GroupBy(IntervalGroup),
Expand All @@ -537,13 +553,11 @@ func getBatchAggQuery(subject string, ranges []TimeRange, globalFrom, globalTo t
return "", nil, errors.New("no aggregations for batch agg")
}
valueTable := buildBatchAggValueTable(floatArgs, locationArgs)
multiIf := buildSegmentIndexMultiIf(vss.TimestampCol, len(ranges))
args := make([]any, 0, 2*len(ranges)+3)
for _, r := range ranges {
args = append(args, r.From, r.To)
}
args = append(args, subject, globalFrom, globalTo)
inner := buildBatchAggInner(valueTable, multiIf)
multiIf := buildSegmentIndexMultiIf(vss.TimestampCol, ranges)
// Only subject remains as a bind param; all timestamps are inlined
// as DateTime64(6) literals to preserve sub-second precision.
args := []any{subject}
inner := buildBatchAggInner(valueTable, multiIf, globalFrom, globalTo)
outer := buildBatchAggOuter(inner, floatArgs, locationArgs)
return outer, args, nil
}
Expand All @@ -559,10 +573,12 @@ func buildBatchAggValueTable(floatArgs []model.FloatSignalArgs, locationArgs []m
return fmt.Sprintf("VALUES('%s', %s) as %s ON %s.%s = %s.%s", valueTableDef, strings.Join(valuesArgs, ", "), aggTableName, vss.TableName, vss.NameCol, aggTableName, vss.NameCol)
}

func buildBatchAggInner(valueTable, multiIf string) string {
func buildBatchAggInner(valueTable, multiIf string, globalFrom, globalTo time.Time) string {
selectList := multiIf + ", " + signalTypeCol + ", " + signalIndexCol + ", " + vss.TimestampCol + ", " + vss.ValueNumberCol + ", " + vss.ValueStringCol + ", " + vss.ValueLocationCol
return "SELECT " + selectList + " FROM " + vss.TableName + " INNER JOIN " + valueTable +
" WHERE " + subjectWhere + " AND " + vss.TimestampCol + " >= ? AND " + vss.TimestampCol + " < ?"
" WHERE " + subjectWhere +
" AND " + vss.TimestampCol + " >= " + dateTime64Micro(globalFrom) +
" AND " + vss.TimestampCol + " < " + dateTime64Micro(globalTo)
}

func buildBatchAggOuter(inner string, floatArgs []model.FloatSignalArgs, locationArgs []model.LocationSignalArgs) string {
Expand Down Expand Up @@ -678,14 +694,16 @@ func aggTableEntry(ft FieldType, index int, name string) string {
return fmt.Sprintf("(%d, %d, '%s')", ft, index, escaped)
}

// buildSegmentIndexMultiIf returns "multiIf( (tsCol >= ? AND tsCol < ?), 0, ..., -1) AS seg_idx" for n ranges.
func buildSegmentIndexMultiIf(timestampCol string, nRanges int) string {
if nRanges == 0 {
// buildSegmentIndexMultiIf returns "multiIf( (tsCol >= dt64 AND tsCol < dt64), 0, ..., -1) AS seg_idx"
// for the given time ranges. Timestamps are formatted inline as DateTime64(6) literals to
// preserve sub-second precision (the clickhouse-go driver truncates positional ? to seconds).
func buildSegmentIndexMultiIf(timestampCol string, ranges []TimeRange) string {
if len(ranges) == 0 {
return "toInt32(-1) AS seg_idx"
}
cond := "(" + timestampCol + " >= ? AND " + timestampCol + " < ?)"
parts := make([]string, 0, nRanges)
for i := 0; i < nRanges; i++ {
parts := make([]string, 0, len(ranges))
for i, r := range ranges {
cond := fmt.Sprintf("(%s >= %s AND %s < %s)", timestampCol, dateTime64Micro(r.From), timestampCol, dateTime64Micro(r.To))
parts = append(parts, cond+", "+fmt.Sprintf("%d", i))
}
return "multiIf(" + strings.Join(parts, ", ") + ", -1) AS seg_idx"
Expand Down Expand Up @@ -771,8 +789,8 @@ func getEventCountsQuery(subject string, from, to time.Time, eventNames []string
qm.Select("count(*) AS count"),
qm.From(vss.EventTableName),
qm.Where(eventSubjectWhere, subject),
qm.Where(vss.EventTimestampCol+" >= ?", from),
qm.Where(vss.EventTimestampCol+" < ?", to),
qm.Where(vss.EventTimestampCol + " >= " + dateTime64Micro(from)),
qm.Where(vss.EventTimestampCol + " < " + dateTime64Micro(to)),
qm.GroupBy(vss.EventNameCol),
}
if len(eventNames) > 0 {
Expand All @@ -793,8 +811,8 @@ func getEventCountsForRangesQuery(subject string, ranges []TimeRange, eventNames
if len(ranges) == 0 {
return eventCountsForRangesEmptyQuery(), nil
}
multiIf := buildSegmentIndexMultiIf(vss.EventTimestampCol, len(ranges))
args := buildEventCountsForRangesArgs(ranges, subject, eventNames)
multiIf := buildSegmentIndexMultiIf(vss.EventTimestampCol, ranges)
args := buildEventCountsForRangesArgs(subject, eventNames)
innerSelect := buildEventCountsForRangesInner(multiIf, eventNames)
stmt := "SELECT seg_idx, name, count(*) AS count FROM (" + innerSelect + ") WHERE seg_idx >= 0 GROUP BY seg_idx, name"
return stmt, args
Expand All @@ -804,11 +822,8 @@ func eventCountsForRangesEmptyQuery() string {
return "SELECT toInt32(-1) AS seg_idx, '' AS name, toUInt64(0) AS count FROM " + vss.EventTableName + " WHERE 0"
}

func buildEventCountsForRangesArgs(ranges []TimeRange, subject string, eventNames []string) []any {
args := make([]any, 0, 2*len(ranges)+1+len(eventNames))
for _, r := range ranges {
args = append(args, r.From, r.To)
}
func buildEventCountsForRangesArgs(subject string, eventNames []string) []any {
args := make([]any, 0, 1+len(eventNames))
args = append(args, subject)
for _, n := range eventNames {
args = append(args, n)
Expand Down
14 changes: 7 additions & 7 deletions internal/service/ch/segments_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ func getWindowedSignalCounts(
signalThreshold int,
distinctSignalThreshold int,
) (_ []ActiveWindow, retErr error) {
query := `
query := fmt.Sprintf(`
SELECT
toStartOfInterval(timestamp, INTERVAL ? second) AS window_start,
toStartOfInterval(timestamp, INTERVAL ? second) + INTERVAL ? second AS window_end,
count() AS signal_count,
uniq(name) AS distinct_signal_count
FROM signal FINAL
PREWHERE subject = ?
WHERE timestamp >= ?
AND timestamp < ?
WHERE timestamp >= %s
AND timestamp < %s
GROUP BY window_start
HAVING signal_count >= ? AND distinct_signal_count >= ?
ORDER BY window_start`
ORDER BY window_start`, dateTime64Micro(from), dateTime64Micro(to))

rows, err := conn.Query(ctx, query, windowSizeSeconds, windowSizeSeconds, windowSizeSeconds, subject, from, to, signalThreshold, distinctSignalThreshold)
rows, err := conn.Query(ctx, query, windowSizeSeconds, windowSizeSeconds, windowSizeSeconds, subject, signalThreshold, distinctSignalThreshold)
if err != nil {
return nil, fmt.Errorf("failed to query windowed signal counts: %w", err)
}
Expand Down Expand Up @@ -127,9 +127,9 @@ func getLevelSamples(ctx context.Context, conn clickhouse.Conn, subject string,
query := "SELECT " + vss.TimestampCol + ", " + vss.ValueNumberCol +
" FROM " + vss.TableName + " FINAL" +
" PREWHERE " + vss.SubjectCol + " = ?" +
" WHERE " + vss.NameCol + " = ? AND " + vss.TimestampCol + " >= ? AND " + vss.TimestampCol + " < ?" +
" WHERE " + vss.NameCol + " = ? AND " + vss.TimestampCol + " >= " + dateTime64Micro(from) + " AND " + vss.TimestampCol + " < " + dateTime64Micro(to) +
" ORDER BY " + vss.TimestampCol
rows, err := conn.Query(ctx, query, subject, name, from, to)
rows, err := conn.Query(ctx, query, subject, name)
if err != nil {
return nil, err
}
Expand Down
Loading