Skip to content

Commit 2588695

Browse files
committed
Aggregator works but does just returns first connection
1 parent c920409 commit 2588695

File tree

7 files changed

+574
-134
lines changed

7 files changed

+574
-134
lines changed

hub/connection_map.go

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ func newConnectionFactory(hub *Hub) *connectionFactory {
3535

3636
// build a map key for the plugin
3737
func (f *connectionFactory) getPluginKey(pluginFQN, connectionName string) string {
38-
// if the plugin supports multi connections, just use FQN
38+
// if we have already loaded this plugin and it supports multi connections, just use FQN
3939
if f.multiConnectionPlugins[pluginFQN] {
4040
return pluginFQN
4141
}
42-
// otherwise for legacy plugins include conneciton name in key
42+
43+
// otherwise assume a legacy plugin and include connection name in key
44+
// (if this tr
4345
return fmt.Sprintf("%s%s%s", pluginFQN, keySeparator, connectionName)
4446
}
4547

@@ -57,21 +59,41 @@ func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipecon
5759
log.Printf("[TRACE] connectionFactory get %s %s", pluginFQN, connectionName)
5860
f.connectionLock.Lock()
5961
defer f.connectionLock.Unlock()
60-
// if this is an aggregate connection, return error
61-
// (we must iterate through the child connections explicitly)
62-
if f.hub.IsAggregatorConnection(connectionName) {
63-
log.Printf("[WARN] connectionFactory get %s %s called for aggregator connection - invalid (we must iterate through the child connections explicitly)", pluginFQN, connectionName)
64-
debug.PrintStack()
65-
return nil, fmt.Errorf("the connectionFactory cannot return or create a connectionPlugin for an aggregate connection")
62+
63+
// build a map key for the plugin
64+
var key string
65+
// if we have already loaded this plugin and it supports multi connections, just use FQN
66+
if f.multiConnectionPlugins[pluginFQN] {
67+
key = pluginFQN
68+
} else {
69+
// otherwise try looking for a legacy connection plugin
70+
key = f.legacyConnectionPluginKey(pluginFQN, connectionName)
6671
}
6772

68-
c, gotPluginClient := f.connectionPlugins[f.getPluginKey(pluginFQN, connectionName)]
73+
c, gotPluginClient := f.connectionPlugins[key]
74+
log.Printf("[TRACE] c %v gotPluginClient %v", c, gotPluginClient)
75+
6976
if gotPluginClient && !c.PluginClient.Exited() {
7077
return c, nil
7178
}
79+
80+
log.Printf("[TRACE] c %v gotPluginClient %v", c, gotPluginClient)
81+
82+
// if we failed to find the connection plugins, and it is a legacy aggregate connection, return error
83+
// (it is invalid to try to 'get' a legacy aggregator connection directly)
84+
if f.hub.IsLegacyAggregatorConnection(connectionName) {
85+
log.Printf("[WARN] connectionFactory get %s %s called for aggregator connection - invalid (we must iterate through the child connections explicitly)", pluginFQN, connectionName)
86+
debug.PrintStack()
87+
return nil, fmt.Errorf("the connectionFactory cannot return or create a connectionPlugin for an aggregate connection")
88+
}
89+
7290
return nil, nil
7391
}
7492

93+
func (f *connectionFactory) legacyConnectionPluginKey(pluginFQN string, connectionName string) string {
94+
return fmt.Sprintf("%s%s%s", pluginFQN, keySeparator, connectionName)
95+
}
96+
7597
func (f *connectionFactory) getOrCreate(pluginFQN, connectionName string) (*steampipeconfig.ConnectionPlugin, error) {
7698
log.Printf("[TRACE] connectionFactory getOrCreate %s %s", pluginFQN, connectionName)
7799
c, err := f.get(pluginFQN, connectionName)
@@ -81,6 +103,8 @@ func (f *connectionFactory) getOrCreate(pluginFQN, connectionName string) (*stea
81103
if c != nil {
82104
return c, nil
83105
}
106+
log.Printf("[TRACE] get returned %v, %v", c, err)
107+
84108
// otherwise create the connection plugin, setting connection config
85109
return f.createConnectionPlugin(pluginFQN, connectionName)
86110
}
@@ -110,21 +134,28 @@ func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionN
110134
return nil, fmt.Errorf("CreateConnectionPlugins did not return error but '%s' not found in connection map", connection.Name)
111135
}
112136

113-
c := connectionPlugins[connection.Name]
137+
connectionPlugin := connectionPlugins[connection.Name]
138+
f.add(connectionPlugin, connectionName)
114139

115-
// if this plugin supports multiple connections, add to multiConnectionPlugins map
116-
if c.SupportedOperations.MultipleConnections {
117-
f.multiConnectionPlugins[c.PluginName] = true
118-
}
119-
// add to map
120-
f.add(c, connection.Name)
121-
122-
return c, nil
140+
return connectionPlugin, nil
123141
}
124142

125-
func (f *connectionFactory) add(connection *steampipeconfig.ConnectionPlugin, connectionName string) {
126-
key := f.getPluginKey(connection.PluginName, connectionName)
127-
f.connectionPlugins[key] = connection
143+
func (f *connectionFactory) add(connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName string) {
144+
// key to add the connection with
145+
var connectionPluginKey string
146+
147+
if connectionPlugin.SupportedOperations.MultipleConnections {
148+
// if this plugin supports multiple connections, add to multiConnectionPlugins map
149+
f.multiConnectionPlugins[connectionPlugin.PluginName] = true
150+
// use plugin name as key
151+
connectionPluginKey = connectionPlugin.PluginName
152+
} else {
153+
// for legacy plugins, include the connection name in the key
154+
connectionPluginKey = f.legacyConnectionPluginKey(connectionPlugin.PluginName, connectionName)
155+
}
156+
157+
// add to map
158+
f.connectionPlugins[connectionPluginKey] = connectionPlugin
128159
}
129160

130161
func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.Schema, error) {

0 commit comments

Comments
 (0)