Skip to content

Commit 6b05d48

Browse files
committed
Use Go 1.19
Acc support for EndExecute to ensure data is cached if Postgres terminates a scan early (e.g. for non-pushed down limit)
1 parent 99a6574 commit 6b05d48

File tree

10 files changed

+51
-24
lines changed

10 files changed

+51
-24
lines changed

fdw.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,6 @@ func goFdwEndForeignScan(node *C.ForeignScanState) {
333333
if s != nil && pluginHub != nil {
334334
log.Printf("[TRACE] goFdwEndForeignScan, iterator: %p", s.Iter)
335335
pluginHub.EndScan(s.Iter, int64(s.State.limit))
336-
337336
}
338337
ClearExecState(node.fdw_state)
339338
node.fdw_state = nil

fdw/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ PG_CFLAGS = -I${SERVER_LIB} -I${INTERNAL_LIB} -g
3333
include $(PGXS)
3434

3535
go: ../fdw.go
36-
go build -o steampipe_postgres_fdw.a -buildmode=c-archive ../*.go
36+
go1.19rc1 build -o steampipe_postgres_fdw.a -buildmode=c-archive ../*.go
3737

3838
inst:
3939
mkdir -p ../build-${PLATFORM}

go.mod

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
module github.com/turbot/steampipe-postgres-fdw
22

3-
go 1.18
3+
go 1.19
44

5-
//replace github.com/turbot/steampipe-plugin-sdk/v4 => /Users/kai/Dev/github/turbot/steampipe-plugin-sdk
6-
//replace github.com/turbot/steampipe => /Users/kai/Dev/github/turbot/steampipe
5+
replace github.com/turbot/steampipe-plugin-sdk/v4 => /Users/kai/Dev/github/turbot/steampipe-plugin-sdk
6+
7+
replace github.com/turbot/steampipe => /Users/kai/Dev/github/turbot/steampipe
78

89
require (
910
github.com/dgraph-io/ristretto v0.1.0 // indirect
@@ -112,7 +113,7 @@ require (
112113
github.com/sethvargo/go-retry v0.1.0 // indirect
113114
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
114115
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
115-
github.com/sirupsen/logrus v1.8.1 // indirect
116+
github.com/sirupsen/logrus v1.9.0 // indirect
116117
github.com/spf13/afero v1.8.2 // indirect
117118
github.com/spf13/cast v1.5.0 // indirect
118119
github.com/spf13/jwalterweatherman v1.1.0 // indirect
@@ -145,7 +146,7 @@ require (
145146
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
146147
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect
147148
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
148-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
149+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
149150
golang.org/x/text v0.3.7 // indirect
150151
google.golang.org/appengine v1.6.7 // indirect
151152
google.golang.org/genproto v0.0.0-20220720214146-176da50484ac // indirect

go.sum

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -867,8 +867,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB
867867
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
868868
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
869869
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
870-
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
871-
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
870+
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
871+
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
872872
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
873873
github.com/smartystreets/assertions v1.13.0 h1:Dx1kYM01xsSqKPno3aqLnrwac2LetPvN23diwyr69Qs=
874874
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
@@ -935,10 +935,6 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
935935
github.com/tombuildsstuff/giovanni v0.15.1/go.mod h1:0TZugJPEtqzPlMpuJHYfXY6Dq2uLPrXf98D2XQSxNbA=
936936
github.com/turbot/go-kit v0.4.0 h1:EdD7Bf2EGAjvHRGQxRiWpDawzZSk3T+eghqbj74qiSc=
937937
github.com/turbot/go-kit v0.4.0/go.mod h1:SBdPRngbEfYubiR81iAVtO43oPkg1+ASr+XxvgbH7/k=
938-
github.com/turbot/steampipe v1.7.0-rc.0.0.20220722125407-74cf746365a4 h1:GwuhRfCpzalNNhqbAbhx1WASNHfTfZhbEhBAyWB7N/w=
939-
github.com/turbot/steampipe v1.7.0-rc.0.0.20220722125407-74cf746365a4/go.mod h1:IOmpgdlP0mhgftCUgqFbtG6rPdOAxtyRSz68tMuPo/s=
940-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.0-alpha.2 h1:Lq+b9zbrYEHRwQ2lH+hJCfEyVLNRGrhPTKJpOnslInw=
941-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.0-alpha.2/go.mod h1:v6bYQWg9z+3IhTpMG7kldX3b774T4eFRxU5llMam9X8=
942938
github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
943939
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
944940
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
@@ -1260,8 +1256,8 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
12601256
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
12611257
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
12621258
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1263-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
1264-
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
1259+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
1260+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
12651261
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
12661262
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
12671263
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

hub/hub.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ func (h *Hub) Abort() {
248248
for _, iter := range h.runningIterators {
249249
// read the scan metadata from the iterator and add to our stack
250250
h.AddScanMetadata(iter)
251-
// close the iterator
252-
iter.Close()
251+
// abort the iterator
252+
iter.Abort()
253253
// remove it from the saved list of iterators
254254
h.RemoveIterator(iter)
255255
}
@@ -633,12 +633,11 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
633633

634634
table := iterator.table
635635
connectionPlugin := iterator.connectionPlugin
636-
callId := grpc.BuildCallId()
637636

638637
req := &proto.ExecuteRequest{
639638
Table: table,
640639
QueryContext: queryContext,
641-
CallId: callId,
640+
CallId: iterator.callId,
642641
TraceContext: grpc.CreateCarrierFromContext(traceCtx.Ctx),
643642
ExecuteConnectionData: make(map[string]*proto.ExecuteConnectionData),
644643
}
@@ -655,12 +654,12 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
655654
req.ExecuteConnectionData[connectionName] = data
656655
}
657656

658-
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
657+
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, iterator.callId, req.CacheEnabled, iterator)
659658
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
660659
// format GRPC errors and ignore not implemented errors for backwards compatibility
661660
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
662661
if err != nil {
663-
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", callId, err)
662+
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", iterator.callId, err)
664663
iterator.setError(err)
665664
return err
666665
}

hub/in_memory_iterator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ func (i *inMemoryIterator) Close() {
6060
i.status = QueryStatusReady
6161
}
6262

63+
func (i *inMemoryIterator) Abort() {
64+
log.Printf("[TRACE] inMemoryIterator Abort() (%p)", i)
65+
i.Close()
66+
}
67+
6368
func (i *inMemoryIterator) CanIterate() bool {
6469
switch i.status {
6570
case QueryStatusError, QueryStatusComplete:

hub/iterator.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ type Iterator interface {
99
ConnectionName() string
1010
// Next returns next row. Nil slice means there is no more rows to scan.
1111
Next() (map[string]interface{}, error)
12-
// Close stops an iteration and frees any resources.
12+
// Close stops an iteration, ensures the plugin writes outstanding ddata to the cache and frees any resources.
1313
Close()
14+
// Abort stops an iteration,and frees any resources.
15+
Abort()
1416
Status() queryStatus
1517
Error() error
1618
CanIterate() bool

hub/legacy_group_iterator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ func (i *legacyGroupIterator) Close() {
176176
i.traceCtx.Span.End()
177177
}
178178

179+
func (i *legacyGroupIterator) Abort() {
180+
// for legacy iterator just call close
181+
i.Close()
182+
}
179183
func (i *legacyGroupIterator) CanIterate() bool {
180184
switch i.Status() {
181185
case QueryStatusError, QueryStatusComplete:

hub/legacy_scan_iterator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,11 @@ func (i *legacyScanIterator) Close() {
151151
}
152152

153153
i.closeSpan()
154+
}
154155

156+
func (i *legacyScanIterator) Abort() {
157+
// for legacy iterator just call Close which ha ssame behaviour as abort
158+
i.Close()
155159
}
156160

157161
// CanIterate returns true if this iterator has results available to iterate

hub/scan_iterator.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"github.com/turbot/steampipe-plugin-sdk/v4/grpc"
78
"log"
89
"time"
910

@@ -44,8 +45,8 @@ type scanIterator struct {
4445
connectionPlugin *steampipeconfig.ConnectionPlugin
4546
cancel context.CancelFunc
4647
traceCtx *telemetry.TraceCtx
47-
48-
startTime time.Time
48+
startTime time.Time
49+
callId string
4950
}
5051

5152
func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName, table string, connectionLimitMap map[string]int64, qualMap map[string]*proto.Quals, columns []string, traceCtx *telemetry.TraceCtx) *scanIterator {
@@ -62,6 +63,7 @@ func newScanIterator(hub *Hub, connectionPlugin *steampipeconfig.ConnectionPlugi
6263
connectionPlugin: connectionPlugin,
6364
traceCtx: traceCtx,
6465
startTime: time.Now(),
66+
callId: grpc.BuildCallId(),
6567
}
6668
}
6769

@@ -148,6 +150,21 @@ func (i *scanIterator) Start(stream proto.WrapperPlugin_ExecuteClient, ctx conte
148150
}
149151

150152
func (i *scanIterator) Close() {
153+
// first, send a message to our plugin terminating execution
154+
// (to ensure that after the context is cancelled, data is written to the cache)
155+
req := &proto.EndExecuteRequest{
156+
CallId: i.callId,
157+
}
158+
err := i.connectionPlugin.PluginClient.EndExecute(req)
159+
if err != nil {
160+
log.Printf("[WARN] EndExecute failed: %s", err.Error())
161+
}
162+
163+
// now call abort to shut down the iterator and cancel the context
164+
i.Abort()
165+
}
166+
167+
func (i *scanIterator) Abort() {
151168
// call the context cancellation function
152169
i.cancel()
153170

0 commit comments

Comments
 (0)