Skip to content

Commit c920409

Browse files
committed
Sinlge plugin for multiple connection working, but high memory usage - still multiple GRPC connections for aggregator
1 parent 2a5ea84 commit c920409

File tree

3 files changed

+122
-98
lines changed

3 files changed

+122
-98
lines changed

hub/connection_map.go

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package hub
22

33
import (
44
"fmt"
5+
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
6+
"github.com/turbot/steampipe/pluginmanager"
57
"log"
68
"runtime/debug"
79
"strings"
@@ -17,23 +19,31 @@ const keySeparator = `\\`
1719
// connectionFactory is responsible for creating and storing connectionPlugins
1820
type connectionFactory struct {
1921
connectionPlugins map[string]*steampipeconfig.ConnectionPlugin
20-
hub *Hub
21-
connectionLock sync.Mutex
22+
// map of loaded multi-connection plugins, keyed by plugin FQN
23+
multiConnectionPlugins map[string]bool
24+
hub *Hub
25+
connectionLock sync.Mutex
2226
}
2327

2428
func newConnectionFactory(hub *Hub) *connectionFactory {
2529
return &connectionFactory{
26-
connectionPlugins: make(map[string]*steampipeconfig.ConnectionPlugin),
27-
hub: hub,
30+
connectionPlugins: make(map[string]*steampipeconfig.ConnectionPlugin),
31+
multiConnectionPlugins: make(map[string]bool),
32+
hub: hub,
2833
}
2934
}
3035

3136
// build a map key for the plugin
3237
func (f *connectionFactory) getPluginKey(pluginFQN, connectionName string) string {
38+
// if the plugin supports multi connections, just use FQN
39+
if f.multiConnectionPlugins[pluginFQN] {
40+
return pluginFQN
41+
}
42+
// otherwise for legacy plugins include conneciton name in key
3343
return fmt.Sprintf("%s%s%s", pluginFQN, keySeparator, connectionName)
3444
}
3545

36-
// extract the plugin FQN and conneciton name from a map key
46+
// extract the plugin FQN and connection name from a map key
3747
func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionName string) {
3848
split := strings.Split(key, keySeparator)
3949
pluginFQN = split[0]
@@ -79,18 +89,41 @@ func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionN
7989
f.connectionLock.Lock()
8090
defer f.connectionLock.Unlock()
8191
log.Printf("[TRACE] connectionFactory.createConnectionPlugin lazy loading connection %s", connectionName)
82-
c, err := f.hub.createConnectionPlugin(pluginFQN, connectionName)
83-
if err != nil {
84-
return nil, err
92+
93+
// load the config for this connection
94+
connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
95+
if !ok {
96+
log.Printf("[WARN] no config found for connection %s", connectionName)
97+
return nil, fmt.Errorf("no config found for connection %s", connectionName)
98+
}
99+
100+
log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", pluginmanager.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)
101+
102+
connectionPlugins, res := steampipeconfig.CreateConnectionPlugins([]*modconfig.Connection{connection})
103+
if res.Error != nil {
104+
return nil, res.Error
105+
}
106+
if connectionPlugins[connection.Name] == nil {
107+
if len(res.Warnings) > 0 {
108+
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
109+
}
110+
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
85111
}
86112

113+
c := connectionPlugins[connection.Name]
114+
115+
// if this plugin supports multiple connections, add to multiConnectionPlugins map
116+
if c.SupportedOperations.MultipleConnections {
117+
f.multiConnectionPlugins[c.PluginName] = true
118+
}
87119
// add to map
88-
f.add(c)
120+
f.add(c, connection.Name)
121+
89122
return c, nil
90123
}
91124

92-
func (f *connectionFactory) add(connection *steampipeconfig.ConnectionPlugin) {
93-
key := f.getPluginKey(connection.PluginName, connection.ConnectionName)
125+
func (f *connectionFactory) add(connection *steampipeconfig.ConnectionPlugin, connectionName string) {
126+
key := f.getPluginKey(connection.PluginName, connectionName)
94127
f.connectionPlugins[key] = connection
95128
}
96129

@@ -102,29 +135,44 @@ func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.
102135
return nil, err
103136
}
104137
if c != nil {
105-
log.Printf("[TRACE] already loaded")
106-
return c.Schema, nil
138+
log.Printf("[TRACE] already loaded %s %s: ", pluginFQN, connectionName)
139+
for k := range c.ConnectionMap {
140+
log.Printf("[TRACE] %s", k)
141+
}
142+
log.Printf("[TRACE] %v", c.ConnectionMap[connectionName].Schema)
143+
144+
return c.ConnectionMap[connectionName].Schema, nil
107145
}
108146

147+
// optimisation - find other plugins with the same schema
148+
// NOTE: this is only relevant for legacy plugins which do not support multiple connections
109149
log.Printf("[TRACE] searching for other connections using same plugin")
110150
for _, c := range f.connectionPlugins {
111151
if c.PluginName == pluginFQN {
152+
// this plugin CANNOT suport multiple connections, otherwise f.get woul dhave returned it
153+
if c.SupportedOperations.MultipleConnections {
154+
return nil, fmt.Errorf("unexpected error: plugin %s supports multi connections but was not returned for connection %s", connectionName)
155+
}
156+
157+
// so we know this connection plugin has a single connection
158+
connectionData := c.ConnectionMap[connectionName]
112159
// so we have found another connection with this plugin
113160
log.Printf("[TRACE] found another connection with this plugin")
114-
// if the schema mode is dynamic we cannot resuse the schema
115-
if c.Schema.Mode == plugin.SchemaModeDynamic {
161+
162+
// if the schema mode is dynamic we cannot reuse the schema
163+
if connectionData.Schema.Mode == plugin.SchemaModeDynamic {
116164
log.Printf("[TRACE] dynamic schema - cannot reuse")
117165
break
118166
}
119167
log.Printf("[TRACE] returning schema")
120-
return c.Schema, nil
168+
return connectionData.Schema, nil
121169
}
122170
}
123-
// otherwise create the connection, but DO NOT set connection config n(this will have been done by the CLI)
171+
// otherwise create the connection
124172
log.Printf("[TRACE] creating connection plugin to get schema")
125173
c, err = f.createConnectionPlugin(pluginFQN, connectionName)
126174
if err != nil {
127175
return nil, err
128176
}
129-
return c.Schema, nil
177+
return c.ConnectionMap[connectionName].Schema, nil
130178
}

hub/hub.go

Lines changed: 24 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/turbot/steampipe/pkg/filepaths"
2121
"github.com/turbot/steampipe/pkg/steampipeconfig"
2222
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
23-
"github.com/turbot/steampipe/pluginmanager"
2423
"go.opentelemetry.io/otel/attribute"
2524
"go.opentelemetry.io/otel/metric/global"
2625
"go.opentelemetry.io/otel/metric/instrument"
@@ -33,8 +32,7 @@ const (
3332

3433
// Hub is a structure representing plugin hub
3534
type Hub struct {
36-
connections *connectionFactory
37-
steampipeConfig *steampipeconfig.SteampipeConfig
35+
connections *connectionFactory
3836
runningIterators []Iterator
3937

4038
// if the cache is enabled/disabled by a metacommand, this will be non null
@@ -296,7 +294,7 @@ func (h *Hub) Scan(columns []string, quals *proto.Quals, limit int64, opts types
296294
// create a span for this scan
297295
scanTraceCtx := h.traceContextForScan(table, columns, limit, qualMap, connectionName)
298296

299-
connectionConfig, _ := h.steampipeConfig.Connections[connectionName]
297+
connectionConfig, _ := steampipeconfig.GlobalConfig.Connections[connectionName]
300298

301299
var iterator Iterator
302300
// if this is an aggregate connection, create a group iterator
@@ -328,8 +326,8 @@ func (h *Hub) LoadConnectionConfig() (bool, error) {
328326
return false, err
329327
}
330328

331-
configChanged := h.steampipeConfig == connectionConfig
332-
h.steampipeConfig = connectionConfig
329+
configChanged := steampipeconfig.GlobalConfig == connectionConfig
330+
steampipeconfig.GlobalConfig = connectionConfig
333331

334332
return configChanged, nil
335333
}
@@ -410,7 +408,7 @@ func (h *Hub) GetPathKeys(opts types.Options) ([]types.PathKey, error) {
410408
if err != nil {
411409
return nil, err
412410
}
413-
schema := connectionPlugin.Schema.Schema[table]
411+
schema := connectionPlugin.ConnectionMap[connectionName].Schema.Schema[table]
414412
var allColumns = make([]string, len(schema.Columns))
415413
for i, c := range schema.Columns {
416414
allColumns[i] = c.Name
@@ -484,7 +482,7 @@ func (h *Hub) startScanForConnection(connectionName string, table string, qualMa
484482
// determine whether to include the limit, based on the quals
485483
// we ONLY pushgdown the limit is all quals have corresponding key columns,
486484
// and if the qual operator is supported by the key column
487-
if limit != -1 && !h.shouldPushdownLimit(table, qualMap, connectionPlugin) {
485+
if limit != -1 && !h.shouldPushdownLimit(table, qualMap, connectionName, connectionPlugin) {
488486
limit = -1
489487
}
490488

@@ -499,7 +497,7 @@ func (h *Hub) startScanForConnection(connectionName string, table string, qualMa
499497
// cache not enabled - create a scan iterator
500498
log.Printf("[TRACE] startScanForConnection creating a new scan iterator")
501499
queryContext := proto.NewQueryContext(columns, qualMap, limit)
502-
iterator := newScanIterator(h, connectionPlugin, table, qualMap, columns, limit, scanTraceCtx)
500+
iterator := newScanIterator(h, connectionPlugin, connectionName, table, qualMap, columns, limit, scanTraceCtx)
503501

504502
if err := h.startScan(iterator, queryContext, scanTraceCtx); err != nil {
505503
return nil, err
@@ -511,9 +509,9 @@ func (h *Hub) startScanForConnection(connectionName string, table string, qualMa
511509
// determine whether to include the limit, based on the quals
512510
// we ONLY pushdown the limit is all quals have corresponding key columns,
513511
// and if the qual operator is supported by the key column
514-
func (h *Hub) shouldPushdownLimit(table string, qualMap map[string]*proto.Quals, connectionPlugin *steampipeconfig.ConnectionPlugin) bool {
512+
func (h *Hub) shouldPushdownLimit(table string, qualMap map[string]*proto.Quals, connectionName string, connectionPlugin *steampipeconfig.ConnectionPlugin) bool {
515513
// build a map of all key columns
516-
tableSchema, ok := connectionPlugin.Schema.Schema[table]
514+
tableSchema, ok := connectionPlugin.ConnectionMap[connectionName].Schema.Schema[table]
517515
if !ok {
518516
// any errors, just default to NOT pushing down the limit
519517
return false
@@ -561,24 +559,24 @@ func (h *Hub) startScan(iterator *scanIterator, queryContext *proto.QueryContext
561559
h.throttle()
562560

563561
table := iterator.table
564-
c := iterator.connection
565-
566-
callId := grpc.BuildCallId(c.ConnectionName)
562+
connectionPlugin := iterator.connectionPlugin
563+
connectionName := iterator.connectionName
564+
callId := grpc.BuildCallId(connectionName)
567565

568566
req := &proto.ExecuteRequest{
569567
Table: table,
570568
QueryContext: queryContext,
571-
Connection: c.ConnectionName,
572-
CacheEnabled: h.cacheEnabled(c),
573-
CacheTtl: int64(h.cacheTTL(c.ConnectionName).Seconds()),
569+
Connection: connectionName,
570+
CacheEnabled: h.cacheEnabled(connectionName),
571+
CacheTtl: int64(h.cacheTTL(connectionName).Seconds()),
574572
CallId: callId,
575573
TraceContext: grpc.CreateCarrierFromContext(traceCtx.Ctx),
576574
}
577575

578576
log.Printf("[INFO] StartScan for table: %s, callId %s, cache enabled: %v, iterator %p", table, callId, req.CacheEnabled, iterator)
579-
stream, ctx, cancel, err := c.PluginClient.Execute(req)
577+
stream, ctx, cancel, err := connectionPlugin.PluginClient.Execute(req)
580578
// format GRPC errors and ignore not implemented errors for backwards compatibility
581-
err = grpc.HandleGrpcError(err, c.PluginName, "Execute")
579+
err = grpc.HandleGrpcError(err, connectionPlugin.PluginName, "Execute")
582580
if err != nil {
583581
log.Printf("[WARN] startScan: plugin Execute function callId: %s returned error: %v\n", callId, err)
584582
iterator.setError(err)
@@ -595,7 +593,7 @@ func (h *Hub) getConnectionPlugin(connectionName string) (*steampipeconfig.Conne
595593
log.Printf("[TRACE] hub.getConnectionPlugin for connection '%s`", connectionName)
596594

597595
// get the plugin FQN
598-
connectionConfig, ok := h.steampipeConfig.Connections[connectionName]
596+
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
599597
if !ok {
600598
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
601599
}
@@ -610,49 +608,25 @@ func (h *Hub) getConnectionPlugin(connectionName string) (*steampipeconfig.Conne
610608
return c, nil
611609
}
612610

613-
// load the given plugin connection into the connection map and return the schema
614-
func (h *Hub) createConnectionPlugin(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
615-
// load the config for this connection
616-
connection, ok := h.steampipeConfig.Connections[connectionName]
617-
if !ok {
618-
log.Printf("[WARN] no config found for connection %s", connectionName)
619-
return nil, fmt.Errorf("no config found for connection %s", connectionName)
620-
}
621-
622-
log.Printf("[TRACE] createConnectionPlugin plugin %s, connection %s, config: %s\n", pluginmanager.PluginFQNToSchemaName(pluginFQN), connectionName, connection.Config)
623-
624-
connectionPlugins, res := steampipeconfig.CreateConnectionPlugins([]*modconfig.Connection{connection})
625-
if res.Error != nil {
626-
return nil, res.Error
627-
}
628-
if connectionPlugins[connection.Name] == nil {
629-
if len(res.Warnings) > 0 {
630-
return nil, fmt.Errorf("%s", strings.Join(res.Warnings, ","))
631-
}
632-
return nil, fmt.Errorf("unknown failure")
633-
}
634-
return connectionPlugins[connection.Name], nil
635-
}
636-
637-
func (h *Hub) cacheEnabled(connection *steampipeconfig.ConnectionPlugin) bool {
611+
func (h *Hub) cacheEnabled(connectionName string) bool {
638612
if h.overrideCacheEnabled != nil {
639613
res := *h.overrideCacheEnabled
640614
log.Printf("[TRACE] cacheEnabled overrideCacheEnabled %v", *h.overrideCacheEnabled)
641615
return res
642616
}
643617
// ask the steampipe config for resolved plugin options - this will use default values where needed
644-
connectionOptions := h.steampipeConfig.GetConnectionOptions(connection.ConnectionName)
618+
connectionOptions := steampipeconfig.GlobalConfig.GetConnectionOptions(connectionName)
645619

646620
// the config loading code should ALWAYS populate the connection options, using defaults if needed
647621
if connectionOptions.Cache == nil {
648-
panic(fmt.Sprintf("No cache options found for connection %s", connection))
622+
panic(fmt.Sprintf("No cache options found for connection %s", connectionName))
649623
}
650624
return *connectionOptions.Cache
651625
}
652626

653627
func (h *Hub) cacheTTL(connectionName string) time.Duration {
654628
// ask the steampipe config for resolved plugin options - this will use default values where needed
655-
connectionOptions := h.steampipeConfig.GetConnectionOptions(connectionName)
629+
connectionOptions := steampipeconfig.GlobalConfig.GetConnectionOptions(connectionName)
656630

657631
// the config loading code shouls ALWAYS populate the connection options, using defaults if needed
658632
if connectionOptions.CacheTTL == nil {
@@ -671,7 +645,7 @@ func (h *Hub) cacheTTL(connectionName string) time.Duration {
671645

672646
// IsAggregatorConnection returns whether the connection with the given name is of type "aggregate"
673647
func (h *Hub) IsAggregatorConnection(connectionName string) bool {
674-
connectionConfig, ok := h.steampipeConfig.Connections[connectionName]
648+
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
675649
return ok && connectionConfig.Type == modconfig.ConnectionTypeAggregator
676650
}
677651

@@ -680,7 +654,7 @@ func (h *Hub) GetAggregateConnectionChild(connectionName string) string {
680654
if !h.IsAggregatorConnection(connectionName) {
681655
panic(fmt.Sprintf("GetAggregateConnectionChild called for connection %s which is not an aggregate", connectionName))
682656
}
683-
aggregateConnection := h.steampipeConfig.Connections[connectionName]
657+
aggregateConnection := steampipeconfig.GlobalConfig.Connections[connectionName]
684658
// get first child
685659
return aggregateConnection.FirstChild().Name
686660
}

0 commit comments

Comments
 (0)