Skip to content

Commit ab86f82

Browse files
authored
Removes support for plugins which do not have multi connection ability. Closes #332
1 parent 279bc10 commit ab86f82

File tree

4 files changed

+10
-687
lines changed

4 files changed

+10
-687
lines changed

hub/connection_factory.go

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ import (
44
"fmt"
55
"github.com/turbot/steampipe/pkg/utils"
66
"log"
7-
"runtime/debug"
87
"strings"
98
"sync"
109

1110
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
12-
"github.com/turbot/steampipe-plugin-sdk/v5/plugin"
1311
"github.com/turbot/steampipe/pkg/steampipeconfig"
1412
)
1513

@@ -42,14 +40,6 @@ func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionNam
4240
func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
4341
log.Printf("[TRACE] connectionFactory get plugin: %s connection %s", pluginFQN, connectionName)
4442

45-
// if we this is a legacy aggregate connection, return error
46-
// (it is invalid to try to 'get' a legacy aggregator connection directly)
47-
if f.hub.IsLegacyAggregatorConnection(connectionName) {
48-
log.Printf("[WARN] connectionFactory get %s %s called for aggregator connection - invalid (we must iterate through the child connections explicitly)", pluginFQN, connectionName)
49-
debug.PrintStack()
50-
return nil, fmt.Errorf("cannot create a connectionPlugin for a legacy aggregator connection")
51-
}
52-
5343
f.connectionLock.Lock()
5444
defer f.connectionLock.Unlock()
5545

@@ -156,39 +146,7 @@ func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.
156146
return c.ConnectionMap[connectionName].Schema, nil
157147
}
158148

159-
// optimisation - find other plugins with the same schema
160-
// NOTE: this is only relevant for legacy plugins which do not support multiple connections
161-
log.Printf("[TRACE] searching for other connections using same plugin")
162-
for _, c := range f.connectionPlugins {
163-
if c.PluginName == pluginFQN {
164-
// if this plugin support multiple connections but does not have the schema for this connection
165-
// there must have been an issue setting connection config
166-
// the CLI should not have called importForeignSchema for this connection - so just return an error
167-
if c.SupportedOperations.MultipleConnections {
168-
return nil, fmt.Errorf("plugin %s is not returning schema for connection %s - check logs for a connection initialisation error", pluginFQN, connectionName)
169-
}
170-
171-
log.Printf("[TRACE] found connectionPlugin with same pluginFQN: %s, conneciton map: %v ", c.PluginName, c.ConnectionMap)
172-
// so we know this connection plugin should have a single connection
173-
if len(c.ConnectionMap) > 1 {
174-
return nil, fmt.Errorf("unexpected error: plugin %s does not support multi connections but has %d connections", pluginFQN, len(c.ConnectionMap))
175-
}
176-
// get the first and only connection data
177-
for _, connectionData := range c.ConnectionMap {
178-
// so we have found another connection with this plugin
179-
log.Printf("[TRACE] found another connection with this plugin: %v", c.ConnectionMap)
180-
181-
// if the schema mode is dynamic we cannot reuse the schema
182-
if connectionData.Schema.Mode == plugin.SchemaModeDynamic {
183-
log.Printf("[TRACE] dynamic schema - cannot reuse")
184-
break
185-
}
186-
log.Printf("[TRACE] returning schema")
187-
return connectionData.Schema, nil
188-
}
189-
}
190-
}
191-
// otherwise create the connection
149+
// create the connection
192150
log.Printf("[TRACE] creating connection plugin to get schema")
193151
c, err = f.createConnectionPlugin(pluginFQN, connectionName)
194152
if err != nil {

hub/hub.go

Lines changed: 9 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,6 @@ func (h *Hub) AddScanMetadata(iter Iterator) {
181181
if _, ok := iter.(*inMemoryIterator); ok {
182182
return
183183
}
184-
// if this is a group iterator, recurse into AddScanMetadata for each underlying iterator
185-
if g, ok := iter.(*legacyGroupIterator); ok {
186-
for _, i := range g.Iterators {
187-
h.AddScanMetadata(i)
188-
}
189-
return
190-
}
191184

192185
log.Printf("[TRACE] AddScanMetadata for iterator %p (%s)", iter, iter.ConnectionName())
193186
// get the id of the last metadata item we currently have
@@ -200,7 +193,11 @@ func (h *Hub) AddScanMetadata(iter Iterator) {
200193
ctx := iter.GetTraceContext().Ctx
201194

202195
connectionName := iter.ConnectionName()
203-
connectionPlugin, _ := h.getConnectionPlugin(connectionName)
196+
connectionPlugin, err := h.getConnectionPlugin(connectionName)
197+
if err != nil {
198+
log.Printf("[TRACE] AddScanMetadata for iterator %p (%s) failed - error getting connectionPlugin: %s", iter, iter.ConnectionName(), err.Error())
199+
return
200+
}
204201

205202
// get list of scan metadata from iterator (may be more than 1 for group_iterator)
206203
scanMetadata := iter.GetScanMetadata()
@@ -270,13 +267,6 @@ func (h *Hub) GetSchema(remoteSchema string, localSchema string) (*proto.Schema,
270267
connectionName := localSchema
271268
log.Printf("[TRACE] getSchema remoteSchema: %s, name %s\n", remoteSchema, connectionName)
272269

273-
// if this is an aggregate connection, get the name of the first child connection
274-
// - we will use this to retrieve the schema
275-
if h.IsLegacyAggregatorConnection(connectionName) {
276-
connectionName = h.GetAggregateConnectionChild(connectionName)
277-
log.Printf("[TRACE] getSchema %s is an aggregator - getting schema for first child %s\n", localSchema, connectionName)
278-
}
279-
280270
return h.connections.getSchema(pluginFQN, connectionName)
281271
}
282272

@@ -294,21 +284,13 @@ func (h *Hub) GetIterator(columns []string, quals *proto.Quals, unhandledRestric
294284

295285
// create a span for this scan
296286
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName)
297-
298-
var iterator Iterator
299-
// if this is a legacy aggregator connection, create a group iterator
300-
if h.IsLegacyAggregatorConnection(connectionName) {
301-
iterator, err = newLegacyGroupIterator(connectionName, table, qualMap, unhandledRestrictions, columns, limit, h, scanTraceCtx)
302-
log.Printf("[TRACE] Hub GetIterator() created aggregate iterator (%p)", iterator)
303-
} else {
304-
iterator, err = h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx)
305-
log.Printf("[TRACE] Hub GetIterator() created iterator (%p)", iterator)
306-
}
287+
iterator, err := h.startScanForConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx)
307288

308289
if err != nil {
309290
log.Printf("[TRACE] Hub GetIterator() failed :( %s", err)
310291
return nil, err
311292
}
293+
log.Printf("[TRACE] Hub GetIterator() created iterator (%p)", iterator)
312294

313295
return iterator, nil
314296
}
@@ -394,12 +376,6 @@ func (h *Hub) GetPathKeys(opts types.Options) ([]types.PathKey, error) {
394376

395377
log.Printf("[TRACE] hub.GetPathKeys for connection '%s`, table `%s`", connectionName, table)
396378

397-
// if this is an aggregate connection, get the first child connection
398-
if h.IsLegacyAggregatorConnection(connectionName) {
399-
connectionName = h.GetAggregateConnectionChild(connectionName)
400-
log.Printf("[TRACE] connection is an aggregate - using child connection: %s", connectionName)
401-
}
402-
403379
// get the schema for this connection
404380
connectionPlugin, err := h.getConnectionPlugin(connectionName)
405381
if err != nil {
@@ -484,12 +460,9 @@ func (h *Hub) startScanForConnection(connectionName string, table string, qualMa
484460
// get connection plugin for this connection
485461
connectionPlugin, err := h.getConnectionPlugin(connectionName)
486462
if err != nil {
463+
log.Printf("[TRACE] getConnectionPlugin failed: %s", err.Error())
487464
return nil, err
488465
}
489-
// if this is a legacy plugin, create legacy iterator
490-
if !connectionPlugin.SupportedOperations.MultipleConnections {
491-
return h.startScanForLegacyConnection(connectionName, table, qualMap, unhandledRestrictions, columns, limit, scanTraceCtx)
492-
}
493466

494467
// ok so this is a multi connection plugin, build list of connections,
495468
// if this connection is NOT an aggregator, only execute for the named connection
@@ -562,47 +535,6 @@ func (h *Hub) buildConnectionLimitMap(table string, qualMap map[string]*proto.Qu
562535
return connectionLimitMap, nil
563536
}
564537

565-
func (h *Hub) startScanForLegacyConnection(connectionName string, table string, qualMap map[string]*proto.Quals, unhandledRestrictions int, columns []string, limit int64, scanTraceCtx *telemetry.TraceCtx) (_ Iterator, err error) {
566-
// if this is an aggregate connection, create a group iterator
567-
if h.IsLegacyAggregatorConnection(connectionName) {
568-
return newLegacyGroupIterator(connectionName, table, qualMap, unhandledRestrictions, columns, limit, h, scanTraceCtx)
569-
}
570-
571-
connectionPlugin, err := h.getConnectionPlugin(connectionName)
572-
if err != nil {
573-
return nil, err
574-
}
575-
576-
connectionSchema, err := connectionPlugin.GetSchema(connectionName)
577-
if err != nil {
578-
return nil, err
579-
}
580-
// determine whether to include the limit, based on the quals
581-
// we ONLY pushdown the limit if all quals have corresponding key columns,
582-
// and if the qual operator is supported by the key column
583-
if limit != -1 && !h.shouldPushdownLimit(table, qualMap, unhandledRestrictions, connectionSchema) {
584-
limit = -1
585-
}
586-
587-
if len(qualMap) > 0 {
588-
log.Printf("[INFO] connection '%s', table '%s', quals %s", connectionName, table, grpc.QualMapToString(qualMap, true))
589-
} else {
590-
log.Println("[INFO] --------")
591-
log.Println("[INFO] no quals")
592-
log.Println("[INFO] --------")
593-
}
594-
595-
log.Printf("[TRACE] startScanForConnection creating a new scan iterator")
596-
queryContext := proto.NewQueryContext(columns, qualMap, limit)
597-
iterator := newLegacyScanIterator(h, connectionPlugin, connectionName, table, qualMap, columns, limit, scanTraceCtx)
598-
599-
if err := h.startLegacyScan(iterator, queryContext, scanTraceCtx); err != nil {
600-
return nil, err
601-
}
602-
603-
return iterator, nil
604-
}
605-
606538
// determine whether to include the limit, based on the quals
607539
// we ONLY pushdown the limit if all quals have corresponding key columns,
608540
// and if the qual operator is supported by the key column
@@ -709,39 +641,6 @@ func (h *Hub) StartScan(i Iterator) error {
709641
return nil
710642
}
711643

712-
// split startScan into a separate function to allow iterator to restart the scan
713-
func (h *Hub) startLegacyScan(iterator *legacyScanIterator, queryContext *proto.QueryContext, traceCtx *telemetry.TraceCtx) error {
714-
// ensure we do not call execute too frequently
715-
h.throttle()
716-
717-
table := iterator.table
718-
connectionPlugin := iterator.connectionPlugin
719-
connectionName := iterator.connectionName
720-
callId := grpc.BuildCallId()
721-
722-
req := &proto.ExecuteRequest{
723-
Table: table,
724-
QueryContext: queryContext,
725-
Connection: connectionName,
726-
CacheEnabled: h.cacheEnabled(connectionName),
727-
CacheTtl: int64(h.cacheTTL(connectionName).Seconds()),
728-
CallId: callId,
729-
TraceContext: grpc.CreateCarrierFromContext(traceCtx.Ctx),
730-
}
731-
732-
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
733-
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
734-
// format GRPC errors and ignore not implemented errors for backwards compatibility
735-
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
736-
if err != nil {
737-
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", callId, err)
738-
iterator.setError(err)
739-
return err
740-
}
741-
iterator.Start(stream, ctx, cancel)
742-
return nil
743-
}
744-
745644
// getConnectionPlugin returns the connectionPlugin for the provided connection
746645
// it also makes sure that the plugin is up and running.
747646
// if the plugin is not running, it attempts to restart the plugin - errors if unable
@@ -759,6 +658,7 @@ func (h *Hub) getConnectionPlugin(connectionName string) (*steampipeconfig.Conne
759658
// ask connection map to get or create this connection
760659
c, err := h.connections.getOrCreate(pluginFQN, connectionName)
761660
if err != nil {
661+
log.Printf("[TRACE] getConnectionPlugin getConnectionPlugin failed: %s", err.Error())
762662
return nil, err
763663
}
764664

@@ -803,47 +703,6 @@ func (h *Hub) cacheTTL(connectionName string) time.Duration {
803703
return ttl
804704
}
805705

806-
// IsLegacyAggregatorConnection returns whether the connection with the given name is
807-
// using a legacy plugin and has type "aggregator"
808-
func (h *Hub) IsLegacyAggregatorConnection(connectionName string) (res bool) {
809-
// is the connection an aggregator?
810-
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
811-
if !ok || connectionConfig.Type != modconfig.ConnectionTypeAggregator {
812-
if !ok {
813-
log.Printf("[WARN] IsLegacyAggregatorConnection: connection %s not found", connectionName)
814-
} else {
815-
log.Printf("[TRACE] connectionConfig.Type is NOT legacy 'aggregator'")
816-
}
817-
return false
818-
}
819-
820-
// ok so it _is_ an aggregator - we need to find out if it is a legacy plugin - only way to do that is to
821-
// instantiate the connection plugin for the first child connection
822-
// NOTE we know there will be at least one child or else the connection will fail validation
823-
for childConnectionName := range connectionConfig.Connections {
824-
connectionPlugin, _ := h.getConnectionPlugin(childConnectionName)
825-
res = connectionPlugin != nil && !connectionPlugin.SupportedOperations.MultipleConnections
826-
log.Printf("[TRACE] IsLegacyAggregatorConnection returning %v", res)
827-
break
828-
}
829-
return res
830-
}
831-
832-
// GetAggregateConnectionChild returns the name of first child connection of the aggregate connection with the given name
833-
func (h *Hub) GetAggregateConnectionChild(connectionName string) string {
834-
if !h.IsLegacyAggregatorConnection(connectionName) {
835-
panic(fmt.Sprintf("GetAggregateConnectionChild called for connection %s which is not an aggregate", connectionName))
836-
}
837-
aggregateConnection := steampipeconfig.GlobalConfig.Connections[connectionName]
838-
// get first child
839-
for connectionName := range aggregateConnection.Connections {
840-
return connectionName
841-
}
842-
843-
// not expected
844-
return ""
845-
}
846-
847706
func (h *Hub) ApplySetting(key string, value string) error {
848707
log.Printf("[TRACE] ApplySetting [%s => %s]", key, value)
849708
return h.cacheSettings.Apply(key, value)

0 commit comments

Comments
 (0)