Skip to content

Commit e84b758

Browse files
authored
Revert EndExecute function code - this would lead to incomplete cached results
1 parent 349cb92 commit e84b758

File tree

8 files changed

+12
-43
lines changed

8 files changed

+12
-43
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/hashicorp/go-version v1.6.0 // indirect
1010
github.com/turbot/go-kit v0.4.0
1111
github.com/turbot/steampipe v0.16.0-rc.7
12-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.1
12+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.2
1313
go.opentelemetry.io/otel v1.9.0
1414
google.golang.org/protobuf v1.28.1
1515
)

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -920,8 +920,8 @@ github.com/turbot/go-kit v0.4.0 h1:EdD7Bf2EGAjvHRGQxRiWpDawzZSk3T+eghqbj74qiSc=
920920
github.com/turbot/go-kit v0.4.0/go.mod h1:SBdPRngbEfYubiR81iAVtO43oPkg1+ASr+XxvgbH7/k=
921921
github.com/turbot/steampipe v0.16.0-rc.7 h1:yzx2IMR8xFDZpsAMEPIota336lwpgzIJXW0U6pK3KeY=
922922
github.com/turbot/steampipe v0.16.0-rc.7/go.mod h1:iRCE1vgFaMgf1imtWQ7bi6VSdvc1aLs+xy5my2J1E6I=
923-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.1 h1:3rLG09QC94NiiMCzXjhbECvOUgSYmcKYZDko9fCXBKM=
924-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.1/go.mod h1:W6QjUrkeWyDgfnAt1qzckI7YEEY4x8WOevxKsVSWLM4=
923+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.2 h1:dr680ALqQ0MbGhOcALx1tXS2Q2ujpIxrwFFvFsSI250=
924+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.2/go.mod h1:W6QjUrkeWyDgfnAt1qzckI7YEEY4x8WOevxKsVSWLM4=
925925
github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
926926
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
927927
github.com/ulikunitz/xz v0.5.8 h1:ERv8V6GKqVi23rgu5cj9pVfVzJbOqAY2Ntl88O6c2nQ=

hub/hub.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ func (h *Hub) Abort() {
248248
for _, iter := range h.runningIterators {
249249
// read the scan metadata from the iterator and add to our stack
250250
h.AddScanMetadata(iter)
251-
// abort the iterator
252-
iter.Abort()
251+
// close the iterator
252+
iter.Close()
253253
// remove it from the saved list of iterators
254254
h.RemoveIterator(iter)
255255
}
@@ -641,11 +641,12 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
641641

642642
table := iterator.table
643643
connectionPlugin := iterator.connectionPlugin
644+
callId := grpc.BuildCallId()
644645

645646
req := &proto.ExecuteRequest{
646647
Table: table,
647648
QueryContext: queryContext,
648-
CallId: iterator.callId,
649+
CallId: callId,
649650
TraceContext: grpc.CreateCarrierFromContext(traceCtx.Ctx),
650651
ExecuteConnectionData: make(map[string]*proto.ExecuteConnectionData),
651652
}
@@ -662,12 +663,12 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
662663
req.ExecuteConnectionData[connectionName] = data
663664
}
664665

665-
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, iterator.callId, req.CacheEnabled, iterator)
666+
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
666667
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
667668
// format GRPC errors and ignore not implemented errors for backwards compatibility
668669
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
669670
if err != nil {
670-
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", iterator.callId, err)
671+
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", callId, err)
671672
iterator.setError(err)
672673
return err
673674
}

hub/in_memory_iterator.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ func (i *inMemoryIterator) Close() {
6060
i.status = QueryStatusReady
6161
}
6262

63-
func (i *inMemoryIterator) Abort() {
64-
log.Printf("[TRACE] inMemoryIterator Abort() (%p)", i)
65-
i.Close()
66-
}
67-
6863
func (i *inMemoryIterator) CanIterate() bool {
6964
switch i.status {
7065
case QueryStatusError, QueryStatusComplete:

hub/iterator.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ type Iterator interface {
99
ConnectionName() string
1010
// Next returns next row. Nil slice means there is no more rows to scan.
1111
Next() (map[string]interface{}, error)
12-
// Close stops an iteration, ensures the plugin writes outstanding data to the cache and frees any resources.
12+
// Close stops an iteration and frees any resources.
1313
Close()
14-
// Abort stops an iteration,and frees any resources.
15-
Abort()
1614
Status() queryStatus
1715
Error() error
1816
CanIterate() bool

hub/legacy_group_iterator.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,6 @@ func (i *legacyGroupIterator) Close() {
176176
i.traceCtx.Span.End()
177177
}
178178

179-
func (i *legacyGroupIterator) Abort() {
180-
// for legacy iterator just call close
181-
i.Close()
182-
}
183179
func (i *legacyGroupIterator) CanIterate() bool {
184180
switch i.Status() {
185181
case QueryStatusError, QueryStatusComplete:

hub/legacy_scan_iterator.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,7 @@ func (i *legacyScanIterator) Close() {
151151
}
152152

153153
i.closeSpan()
154-
}
155154

156-
func (i *legacyScanIterator) Abort() {
157-
// for legacy iterator just call Close which has same behaviour as abort
158-
i.Close()
159155
}
160156

161157
// CanIterate returns true if this iterator has results available to iterate

hub/scan_iterator.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/turbot/steampipe-plugin-sdk/v4/grpc"
87
"log"
98
"time"
109

@@ -45,8 +44,8 @@ type scanIterator struct {
4544
connectionPlugin *steampipeconfig.ConnectionPlugin
4645
cancel context.CancelFunc
4746
traceCtx *telemetry.TraceCtx
48-
startTime time.Time
49-
callId string
47+
48+
startTime time.Time
5049
}
5150

5251
func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, traceCtx *telemetry.TraceCtx) *scanIterator {
@@ -63,7 +62,6 @@ func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugi
6362
connectionPlugin: connectionPlugin,
6463
traceCtx: traceCtx,
6564
startTime: time.Now(),
66-
callId: grpc.BuildCallId(),
6765
}
6866
}
6967

@@ -150,21 +148,6 @@ func (i *scanIterator) Start(stream proto.WrapperPlugin_ExecuteClient, ctx conte
150148
}
151149

152150
func (i *scanIterator) Close() {
153-
// first, send a message to our plugin terminating execution
154-
// (to ensure that after the context is cancelled, data is written to the cache)
155-
req := &proto.EndExecuteRequest{
156-
CallId: i.callId,
157-
}
158-
err := i.connectionPlugin.PluginClient.EndExecute(req)
159-
if err != nil {
160-
log.Printf("[WARN] EndExecute failed: %s", err.Error())
161-
}
162-
163-
// now call abort to shut down the iterator and cancel the context
164-
i.Abort()
165-
}
166-
167-
func (i *scanIterator) Abort() {
168151
// call the context cancellation function
169152
i.cancel()
170153

0 commit comments

Comments
 (0)