Skip to content

Commit 316a5e6

Browse files
committed
fixing legacy plugin support
1 parent 6176ab5 commit 316a5e6

File tree

4 files changed

+65
-51
lines changed

4 files changed

+65
-51
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ require (
5050
github.com/deislabs/oras v0.8.1 // indirect
5151
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
5252
github.com/dustin/go-humanize v1.0.0 // indirect
53-
github.com/eko/gocache/v3 v3.1.0 // indirect
53+
github.com/eko/gocache/v3 v3.1.1 // indirect
5454
github.com/fatih/color v1.13.0 // indirect
5555
github.com/fsnotify/fsnotify v1.5.4 // indirect
5656
github.com/gertd/go-pluralize v0.2.1 // indirect

hub/connection_map.go

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -56,37 +56,45 @@ func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionNam
5656
// if a connection plugin for the plugin and connection, return it. If it does not, create it, store in map and return it
5757
// NOTE: there is special case logic got aggregate connections
5858
func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
59-
log.Printf("[TRACE] connectionFactory get %s %s", pluginFQN, connectionName)
59+
log.Printf("[TRACE] connectionFactory get plugin: %s connection %s", pluginFQN, connectionName)
60+
61+
// if we this is a legacy aggregate connection, return error
62+
// (it is invalid to try to 'get' a legacy aggregator connection directly)
63+
if f.hub.IsLegacyAggregatorConnection(connectionName) {
64+
log.Printf("[WARN] connectionFactory get %s %s called for aggregator connection - invalid (we must iterate through the child connections explicitly)", pluginFQN, connectionName)
65+
debug.PrintStack()
66+
return nil, fmt.Errorf("cannot create a connectionPlugin for a legacy aggregator connection")
67+
}
68+
6069
f.connectionLock.Lock()
6170
defer f.connectionLock.Unlock()
6271

6372
// build a map key for the plugin
6473
var key string
6574
// if we have already loaded this plugin and it supports multi connections, just use FQN
6675
if f.multiConnectionPlugins[pluginFQN] {
76+
log.Printf("[TRACE] %s supports multi connections, using pluginFQN for key", pluginFQN)
6777
key = pluginFQN
6878
} else {
6979
// otherwise try looking for a legacy connection plugin
7080
key = f.legacyConnectionPluginKey(pluginFQN, connectionName)
71-
}
81+
log.Printf("[TRACE] %s is a legacy connections, using key %s", connectionName, key)
7282

73-
c, gotPluginClient := f.connectionPlugins[key]
74-
log.Printf("[TRACE] c %v gotPluginClient %v", c, gotPluginClient)
83+
}
7584

76-
if gotPluginClient && !c.PluginClient.Exited() {
85+
c, gotConnectionPlugin := f.connectionPlugins[key]
86+
if gotConnectionPlugin && !c.PluginClient.Exited() {
7787
return c, nil
7888
}
7989

80-
log.Printf("[TRACE] c %v gotPluginClient %v", c, gotPluginClient)
81-
82-
// if we failed to find the connection plugins, and it is a legacy aggregate connection, return error
83-
// (it is invalid to try to 'get' a legacy aggregator connection directly)
84-
if f.hub.IsLegacyAggregatorConnection(connectionName) {
85-
log.Printf("[WARN] connectionFactory get %s %s called for aggregator connection - invalid (we must iterate through the child connections explicitly)", pluginFQN, connectionName)
86-
debug.PrintStack()
87-
return nil, fmt.Errorf("the connectionFactory cannot return or create a connectionPlugin for an aggregate connection")
90+
// so either we have not yet instantiated the conneciton plugin, or it has exited
91+
if !gotConnectionPlugin {
92+
log.Printf("[TRACE] no connectionPlugin loaded with key %s", key)
93+
} else {
94+
log.Printf("[TRACE] connectionPluginwith key %s has exited - reloading", key)
8895
}
8996

97+
log.Printf("[TRACE] failed to get plugin: %s connection %s", pluginFQN, connectionName)
9098
return nil, nil
9199
}
92100

@@ -112,7 +120,7 @@ func (f *connectionFactory) getOrCreate(pluginFQN, connectionName string) (*stea
112120
func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
113121
f.connectionLock.Lock()
114122
defer f.connectionLock.Unlock()
115-
log.Printf("[TRACE] connectionFactory.createConnectionPlugin lazy loading connection %s", connectionName)
123+
log.Printf("[TRACE] connectionFactory.createConnectionPlugin create connection %s", connectionName)
116124

117125
// load the config for this connection
118126
connection, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
@@ -180,23 +188,29 @@ func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.
180188
log.Printf("[TRACE] searching for other connections using same plugin")
181189
for _, c := range f.connectionPlugins {
182190
if c.PluginName == pluginFQN {
183-
// this plugin CANNOT suport multiple connections, otherwise f.get woul dhave returned it
191+
// this plugin CANNOT suport multiple connections, otherwise f.get would have returned it
184192
if c.SupportedOperations.MultipleConnections {
185-
return nil, fmt.Errorf("unexpected error: plugin %s supports multi connections but was not returned for connection %s", connectionName)
193+
return nil, fmt.Errorf("unexpected error: plugin %s supports multi connections but was not returned for connection %s", pluginFQN, connectionName)
186194
}
187195

188-
// so we know this connection plugin has a single connection
189-
connectionData := c.ConnectionMap[connectionName]
190-
// so we have found another connection with this plugin
191-
log.Printf("[TRACE] found another connection with this plugin")
192-
193-
// if the schema mode is dynamic we cannot reuse the schema
194-
if connectionData.Schema.Mode == plugin.SchemaModeDynamic {
195-
log.Printf("[TRACE] dynamic schema - cannot reuse")
196-
break
196+
log.Printf("[TRACE] found connectionPlugin with same pluginFQN: %s, conneciton map: %v ", c.PluginName, c.ConnectionMap)
197+
// so we know this connection plugin should have a single connection
198+
if len(c.ConnectionMap) > 1 {
199+
return nil, fmt.Errorf("unexpected error: plugin %s does not support multi connections but has %d connections", pluginFQN, len(c.ConnectionMap))
200+
}
201+
// get the first and only connection data
202+
for _, connectionData := range c.ConnectionMap {
203+
// so we have found another connection with this plugin
204+
log.Printf("[TRACE] found another connection with this plugin: %v", c.ConnectionMap)
205+
206+
// if the schema mode is dynamic we cannot reuse the schema
207+
if connectionData.Schema.Mode == plugin.SchemaModeDynamic {
208+
log.Printf("[TRACE] dynamic schema - cannot reuse")
209+
break
210+
}
211+
log.Printf("[TRACE] returning schema")
212+
return connectionData.Schema, nil
197213
}
198-
log.Printf("[TRACE] returning schema")
199-
return connectionData.Schema, nil
200214
}
201215
}
202216
// otherwise create the connection

hub/hub.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -755,28 +755,28 @@ func (h *Hub) cacheTTL(connectionName string) time.Duration {
755755

756756
// IsLegacyAggregatorConnection returns whether the connection with the given name is
757757
// using a legacy plugin and has type "aggregator"
758-
func (h *Hub) IsLegacyAggregatorConnection(connectionName string) bool {
759-
// TODO KAI NEEDS THOUGHT!!! we cannot tell its legacy aggregator until plugin is instantiated
760-
return false
761-
//log.Printf("[TRACE] IsLegacyAggregatorConnection %s", connectionName)
762-
//
763-
//connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
764-
//if !ok || connectionConfig.Type != modconfig.ConnectionTypeAggregator {
765-
// if !ok {
766-
// log.Printf("[WARN] IsLegacyAggregatorConnection: connection %s not found", connectionName)
767-
// } else {
768-
// log.Printf("[TRACE] connectionConfig.Type is NOT 'aggregator'")
769-
// }
770-
// return false
771-
//}
772-
//
773-
//// get connection plugin for first connection
774-
//childConnectionName := connectionConfig.ConnectionNames[0]
775-
//connectionPlugin, _ := h.getConnectionPlugin(childConnectionName)
776-
//res := connectionPlugin != nil && !connectionPlugin.SupportedOperations.MultipleConnections
777-
//
778-
//log.Printf("[TRACE] IsLegacyAggregatorConnection returning %v", res)
779-
//return res
758+
func (h *Hub) IsLegacyAggregatorConnection(connectionName string) (res bool) {
759+
// is the connection an aggregator?
760+
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
761+
if !ok || connectionConfig.Type != modconfig.ConnectionTypeAggregator {
762+
if !ok {
763+
log.Printf("[WARN] IsLegacyAggregatorConnection: connection %s not found", connectionName)
764+
} else {
765+
log.Printf("[TRACE] connectionConfig.Type is NOT 'aggregator'")
766+
}
767+
return false
768+
}
769+
770+
// ok so it _is_ an aggregator - we need to find out if it is a legacy plugin - only way to do that is to
771+
// instantiate the connection plugin for the first child connection
772+
// NOTE we know there will be at least one child or else the connection will fail validation
773+
for childConnectionName := range connectionConfig.Connections {
774+
connectionPlugin, _ := h.getConnectionPlugin(childConnectionName)
775+
res = connectionPlugin != nil && !connectionPlugin.SupportedOperations.MultipleConnections
776+
log.Printf("[TRACE] IsLegacyAggregatorConnection returning %v", res)
777+
break
778+
}
779+
return res
780780
}
781781

782782
// GetAggregateConnectionChild returns the name of first child connection of the aggregate connection with the given name

schema.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func SchemaToSql(schema map[string]*proto.TableSchema, stmt *C.ImportForeignSche
6464
FdwError(err)
6565
return nil
6666
}
67-
log.Printf("[INFO] Table sql: \n%s\n", sql)
67+
//log.Printf("[INFO] Table sql: \n%s\n", sql)
6868
commands = C.lappend(commands, unsafe.Pointer(C.CString(sql)))
6969
}
7070

0 commit comments

Comments
 (0)