Skip to content

Commit abd7e9b

Browse files
committed
Revert EndExecute
1 parent 6b05d48 commit abd7e9b

File tree

6 files changed

+9
-40
lines changed

6 files changed

+9
-40
lines changed

hub/hub.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ func (h *Hub) Abort() {
248248
for _, iter := range h.runningIterators {
249249
// read the scan metadata from the iterator and add to our stack
250250
h.AddScanMetadata(iter)
251-
// abort the iterator
252-
iter.Abort()
251+
// close the iterator
252+
iter.Close()
253253
// remove it from the saved list of iterators
254254
h.RemoveIterator(iter)
255255
}
@@ -633,11 +633,12 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
633633

634634
table := iterator.table
635635
connectionPlugin := iterator.connectionPlugin
636+
callId := grpc.BuildCallId()
636637

637638
req := &proto.ExecuteRequest{
638639
Table: table,
639640
QueryContext: queryContext,
640-
CallId: iterator.callId,
641+
CallId: callId,
641642
TraceContext: grpc.CreateCarrierFromContext(traceCtx.Ctx),
642643
ExecuteConnectionData: make(map[string]*proto.ExecuteConnectionData),
643644
}
@@ -654,12 +655,12 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
654655
req.ExecuteConnectionData[connectionName] = data
655656
}
656657

657-
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, iterator.callId, req.CacheEnabled, iterator)
658+
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
658659
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
659660
// format GRPC errors and ignore not implemented errors for backwards compatibility
660661
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
661662
if err != nil {
662-
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", iterator.callId, err)
663+
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", callId, err)
663664
iterator.setError(err)
664665
return err
665666
}

hub/in_memory_iterator.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ func (i *inMemoryIterator) Close() {
6060
i.status = QueryStatusReady
6161
}
6262

63-
func (i *inMemoryIterator) Abort() {
64-
log.Printf("[TRACE] inMemoryIterator Abort() (%p)", i)
65-
i.Close()
66-
}
67-
6863
func (i *inMemoryIterator) CanIterate() bool {
6964
switch i.status {
7065
case QueryStatusError, QueryStatusComplete:

hub/iterator.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,8 @@ type Iterator interface {
99
ConnectionName() string
1010
// Next returns next row. Nil slice means there is no more rows to scan.
1111
Next() (map[string]interface{}, error)
12-
// Close stops an iteration, ensures the plugin writes outstanding ddata to the cache and frees any resources.
12+
// Close stops an iteration and frees any resources.
1313
Close()
14-
// Abort stops an iteration,and frees any resources.
15-
Abort()
1614
Status() queryStatus
1715
Error() error
1816
CanIterate() bool

hub/legacy_group_iterator.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,6 @@ func (i *legacyGroupIterator) Close() {
176176
i.traceCtx.Span.End()
177177
}
178178

179-
func (i *legacyGroupIterator) Abort() {
180-
// for legacy iterator just call close
181-
i.Close()
182-
}
183179
func (i *legacyGroupIterator) CanIterate() bool {
184180
switch i.Status() {
185181
case QueryStatusError, QueryStatusComplete:

hub/legacy_scan_iterator.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,7 @@ func (i *legacyScanIterator) Close() {
151151
}
152152

153153
i.closeSpan()
154-
}
155154

156-
func (i *legacyScanIterator) Abort() {
157-
// for legacy iterator just call Close which ha ssame behaviour as abort
158-
i.Close()
159155
}
160156

161157
// CanIterate returns true if this iterator has results available to iterate

hub/scan_iterator.go

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"github.com/turbot/steampipe-plugin-sdk/v4/grpc"
87
"log"
98
"time"
109

@@ -45,8 +44,8 @@ type scanIterator struct {
4544
connectionPlugin *steampipeconfig.ConnectionPlugin
4645
cancel context.CancelFunc
4746
traceCtx *telemetry.TraceCtx
48-
startTime time.Time
49-
callId string
47+
48+
startTime time.Time
5049
}
5150

5251
func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, traceCtx *telemetry.TraceCtx) *scanIterator {
@@ -63,7 +62,6 @@ func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugi
6362
connectionPlugin: connectionPlugin,
6463
traceCtx: traceCtx,
6564
startTime: time.Now(),
66-
callId: grpc.BuildCallId(),
6765
}
6866
}
6967

@@ -150,21 +148,6 @@ func (i *scanIterator) Start(stream proto.WrapperPlugin_ExecuteClient, ctx conte
150148
}
151149

152150
func (i *scanIterator) Close() {
153-
// first, send a message to our plugin terminating execution
154-
// (to ensure that after the context is cancelled, data is written to the cache)
155-
req := &proto.EndExecuteRequest{
156-
CallId: i.callId,
157-
}
158-
err := i.connectionPlugin.PluginClient.EndExecute(req)
159-
if err != nil {
160-
log.Printf("[WARN] EndExecute failed: %s", err.Error())
161-
}
162-
163-
// now call abort to shut down the iterator and cancel the context
164-
i.Abort()
165-
}
166-
167-
func (i *scanIterator) Abort() {
168151
// call the context cancellation function
169152
i.cancel()
170153

0 commit comments

Comments
 (0)