77 "log"
88 "time"
99
10- "go.opentelemetry.io/otel/attribute"
11-
1210 "github.com/golang/protobuf/ptypes"
1311 "github.com/turbot/go-kit/helpers"
1412 "github.com/turbot/steampipe-plugin-sdk/v3/grpc/proto"
@@ -34,7 +32,7 @@ type scanIterator struct {
3432 status queryStatus
3533 err error
3634 rows chan * proto.Row
37- scanMetadata * proto.QueryMetadata
35+ scanMetadata map [ string ] * proto.QueryMetadata
3836 columns []string
3937 pluginRowStream proto.WrapperPlugin_ExecuteClient
4038 rel * types.Relation
@@ -54,6 +52,7 @@ func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugi
5452 return & scanIterator {
5553 status : QueryStatusReady ,
5654 rows : make (chan * proto.Row , rowBufferSize ),
55+ scanMetadata : make (map [string ]* proto.QueryMetadata ),
5756 hub : hub ,
5857 columns : columns ,
5958 qualMap : qualMap ,
@@ -126,13 +125,14 @@ func (i *scanIterator) Next() (map[string]interface{}, error) {
126125
127126func (i * scanIterator ) closeSpan () {
128127 // if we have scan metadata, add to span
129- if i .scanMetadata != nil {
130- i .traceCtx .Span .SetAttributes (
131- attribute .Int64 ("hydrate_calls" , i .scanMetadata .HydrateCalls ),
132- attribute .Int64 ("rows_fetched" , i .scanMetadata .RowsFetched ),
133- attribute .Bool ("cache_hit" , i .scanMetadata .CacheHit ),
134- )
135- }
128+ // TODO SUM ALL metadata
129+ //if i.scanMetadata != nil {
130+ // i.traceCtx.Span.SetAttributes(
131+ // attribute.Int64("hydrate_calls", i.scanMetadata.HydrateCalls),
132+ // attribute.Int64("rows_fetched", i.scanMetadata.RowsFetched),
133+ // attribute.Bool("cache_hit", i.scanMetadata.CacheHit),
134+ // )
135+ //}
136136
137137 i .traceCtx .Span .End ()
138138}
@@ -173,22 +173,25 @@ func (i *scanIterator) CanIterate() bool {
173173}
174174
175175func (i * scanIterator ) GetScanMetadata () []ScanMetadata {
176- // TODO how will this work for aggregate connections??? just use first connection?
177- // scan metadata will only be populated for plugins using latest sdk
178- if i .scanMetadata == nil {
179- return nil
176+
177+ res := make ([]ScanMetadata , len (i .scanMetadata ))
178+ idx := 0
179+ for connection , m := range i .scanMetadata {
180+ log .Printf ("[WARN] GetScanMetadata connection %s, %v" , connection , m )
181+ res [idx ] = ScanMetadata {
182+ Table : i .table ,
183+ CacheHit : m .CacheHit ,
184+ RowsFetched : m .RowsFetched ,
185+ HydrateCalls : m .HydrateCalls ,
186+ Columns : i .columns ,
187+ Quals : i .qualMap ,
188+ //Limit: i.limit,
189+ StartTime : i .startTime ,
190+ Duration : time .Since (i .startTime ),
191+ }
192+ idx ++
180193 }
181- return []ScanMetadata {{
182- Table : i .table ,
183- CacheHit : i .scanMetadata .CacheHit ,
184- RowsFetched : i .scanMetadata .RowsFetched ,
185- HydrateCalls : i .scanMetadata .HydrateCalls ,
186- Columns : i .columns ,
187- Quals : i .qualMap ,
188- //Limit: i.limit,
189- StartTime : i .startTime ,
190- Duration : time .Since (i .startTime ),
191- }}
194+ return res
192195}
193196
194197func (i * scanIterator ) GetTraceContext () * telemetry.TraceCtx {
@@ -275,8 +278,8 @@ func (i *scanIterator) readPluginResult(ctx context.Context) bool {
275278 // stop reading
276279 continueReading = false
277280 } else {
278- // update the scan metadata (this will overwrite any existing from the previous row)
279- i .scanMetadata = rowResult .Metadata
281+ // update the scan metadata for this connection (this will overwrite any existing from the previous row)
282+ i .scanMetadata [ rowResult . Connection ] = rowResult .Metadata
280283
281284 // so we have a row
282285 i .rows <- rowResult .Row
0 commit comments