Skip to content

Commit 2f83cae

Browse files
authored
Ensure ConnectionPlugins are cached by the hub. Closes #230
1 parent ea41bba commit 2f83cae

File tree

6 files changed

+76
-94
lines changed

6 files changed

+76
-94
lines changed

fdw.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
var logger hclog.Logger
2929

3030
// force loading of this module
31+
//
3132
//export goInit
3233
func goInit() {}
3334

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ require (
88
github.com/hashicorp/go-hclog v1.2.2
99
github.com/hashicorp/go-version v1.6.0 // indirect
1010
github.com/turbot/go-kit v0.4.0
11-
github.com/turbot/steampipe v0.16.0-rc.6
12-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-alpha.0
11+
github.com/turbot/steampipe v0.16.0-rc.7
12+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.0
1313
go.opentelemetry.io/otel v1.9.0
1414
google.golang.org/protobuf v1.28.1
1515
)

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -918,10 +918,10 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
918918
github.com/tombuildsstuff/giovanni v0.15.1/go.mod h1:0TZugJPEtqzPlMpuJHYfXY6Dq2uLPrXf98D2XQSxNbA=
919919
github.com/turbot/go-kit v0.4.0 h1:EdD7Bf2EGAjvHRGQxRiWpDawzZSk3T+eghqbj74qiSc=
920920
github.com/turbot/go-kit v0.4.0/go.mod h1:SBdPRngbEfYubiR81iAVtO43oPkg1+ASr+XxvgbH7/k=
921-
github.com/turbot/steampipe v0.16.0-rc.6 h1:9lSFwKgM5rs8nevhiQqudlfd97kQB1EuMl/pOeLdynA=
922-
github.com/turbot/steampipe v0.16.0-rc.6/go.mod h1:iRCE1vgFaMgf1imtWQ7bi6VSdvc1aLs+xy5my2J1E6I=
923-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-alpha.0 h1:x9StD5lugScUl0eFyoRE2ACXYoL//9Fc86bTi9BRKNg=
924-
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-alpha.0/go.mod h1:W6QjUrkeWyDgfnAt1qzckI7YEEY4x8WOevxKsVSWLM4=
921+
github.com/turbot/steampipe v0.16.0-rc.7 h1:yzx2IMR8xFDZpsAMEPIota336lwpgzIJXW0U6pK3KeY=
922+
github.com/turbot/steampipe v0.16.0-rc.7/go.mod h1:iRCE1vgFaMgf1imtWQ7bi6VSdvc1aLs+xy5my2J1E6I=
923+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.0 h1:dW83ZlZ1B5TU3OHBHut9/xlM3hn7H0yoYd+1rGSFZWg=
924+
github.com/turbot/steampipe-plugin-sdk/v4 v4.0.1-rc.0/go.mod h1:W6QjUrkeWyDgfnAt1qzckI7YEEY4x8WOevxKsVSWLM4=
925925
github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
926926
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
927927
github.com/ulikunitz/xz v0.5.8 h1:ERv8V6GKqVi23rgu5cj9pVfVzJbOqAY2Ntl88O6c2nQ=

hub/connection_map.go

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,17 @@ const keySeparator = `\\`
1919
// connectionFactory is responsible for creating and storing connectionPlugins
2020
type connectionFactory struct {
2121
connectionPlugins map[string]*steampipeconfig.ConnectionPlugin
22-
// map of loaded multi-connection plugins, keyed by plugin FQN
23-
multiConnectionPlugins map[string]bool
24-
hub *Hub
25-
connectionLock sync.Mutex
22+
hub *Hub
23+
connectionLock sync.Mutex
2624
}
2725

2826
func newConnectionFactory(hub *Hub) *connectionFactory {
2927
return &connectionFactory{
30-
connectionPlugins: make(map[string]*steampipeconfig.ConnectionPlugin),
31-
multiConnectionPlugins: make(map[string]bool),
32-
hub: hub,
28+
connectionPlugins: make(map[string]*steampipeconfig.ConnectionPlugin),
29+
hub: hub,
3330
}
3431
}
3532

36-
// build a map key for the plugin
37-
func (f *connectionFactory) getPluginKey(pluginFQN, connectionName string) string {
38-
// if we have already loaded this plugin and it supports multi connections, just use FQN
39-
if f.multiConnectionPlugins[pluginFQN] {
40-
return pluginFQN
41-
}
42-
43-
// otherwise assume a legacy plugin and include connection name in key
44-
return fmt.Sprintf("%s%s%s", pluginFQN, keySeparator, connectionName)
45-
}
46-
4733
// extract the plugin FQN and connection name from a map key
4834
func (f *connectionFactory) parsePluginKey(key string) (pluginFQN, connectionName string) {
4935
split := strings.Split(key, keySeparator)
@@ -68,15 +54,8 @@ func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipecon
6854
f.connectionLock.Lock()
6955
defer f.connectionLock.Unlock()
7056

71-
// if we have already loaded this plugin and it supports multi connections, we do not cache it locally
72-
// (as the connection list may have changed)
73-
if f.multiConnectionPlugins[pluginFQN] {
74-
log.Printf("[TRACE] %s supports multi connections, refetching from plugin manager", pluginFQN)
75-
return nil, nil
76-
}
7757
// build a map key for the plugin
78-
key := f.legacyConnectionPluginKey(pluginFQN, connectionName)
79-
log.Printf("[TRACE] %s is a legacy connections, using key %s", connectionName, key)
58+
key := f.connectionPluginKey(pluginFQN, connectionName)
8059

8160
c, gotConnectionPlugin := f.connectionPlugins[key]
8261
if gotConnectionPlugin && !c.PluginClient.Exited() {
@@ -94,7 +73,7 @@ func (f *connectionFactory) get(pluginFQN, connectionName string) (*steampipecon
9473
return nil, nil
9574
}
9675

97-
func (f *connectionFactory) legacyConnectionPluginKey(pluginFQN string, connectionName string) string {
76+
func (f *connectionFactory) connectionPluginKey(pluginFQN string, connectionName string) string {
9877
return fmt.Sprintf("%s%s%s", pluginFQN, keySeparator, connectionName)
9978
}
10079

@@ -144,18 +123,18 @@ func (f *connectionFactory) createConnectionPlugin(pluginFQN string, connectionN
144123
}
145124

146125
func (f *connectionFactory) add(connectionPlugin *steampipeconfig.ConnectionPlugin, connectionName string) {
147-
if connectionPlugin.SupportedOperations.MultipleConnections {
148-
// if this plugin supports multiple connections, add to multiConnectionPlugins map but not to connectionPlugins
149-
// ( we cannot cache the connection plugin as the associated connections may change
150-
// based on connection config changes)
151-
f.multiConnectionPlugins[connectionPlugin.PluginName] = true
152-
return
126+
log.Printf("[TRACE] connectionFactory add %s - adding all connections supported by plugin", connectionName)
127+
128+
// add a map entry for all connections supported by the plugib
129+
for c := range connectionPlugin.ConnectionMap {
130+
log.Printf("[TRACE] add %s", c)
131+
connectionPluginKey := f.connectionPluginKey(connectionPlugin.PluginName, c)
132+
// NOTE: there may already be map entries for some connections
133+
// - this could occur if the filewatcher detects a connection added for a plugin
134+
if _, ok := f.connectionPlugins[connectionPluginKey]; !ok {
135+
f.connectionPlugins[connectionPluginKey] = connectionPlugin
136+
}
153137
}
154-
// for legacy plugins, include the connection name in the key
155-
connectionPluginKey := f.legacyConnectionPluginKey(connectionPlugin.PluginName, connectionName)
156-
157-
// add to map
158-
f.connectionPlugins[connectionPluginKey] = connectionPlugin
159138
}
160139

161140
func (f *connectionFactory) getSchema(pluginFQN, connectionName string) (*proto.Schema, error) {

hub/hub.go

Lines changed: 52 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,15 @@ func (h *Hub) LoadConnectionConfig() (bool, error) {
325325
}
326326

327327
// GetRelSize is a method called from the planner to estimate the resulting relation size for a scan.
328-
// It will help the planner in deciding between different types of plans,
329-
// according to their costs.
330-
// Args:
331-
// columns (list): The list of columns that must be returned.
332-
// quals (list): A list of Qual instances describing the filters
333-
// applied to this scan.
334-
// Returns:
335-
// A struct of the form (expected_number_of_rows, avg_row_width (in bytes))
328+
//
329+
// It will help the planner in deciding between different types of plans,
330+
// according to their costs.
331+
// Args:
332+
// columns (list): The list of columns that must be returned.
333+
// quals (list): A list of Qual instances describing the filters
334+
// applied to this scan.
335+
// Returns:
336+
// A struct of the form (expected_number_of_rows, avg_row_width (in bytes))
336337
func (h *Hub) GetRelSize(columns []string, quals []*proto.Qual, opts types.Options) (types.RelSize, error) {
337338
result := types.RelSize{
338339
// Default to 1M rows, because these tables are typically expensive
@@ -345,43 +346,44 @@ func (h *Hub) GetRelSize(columns []string, quals []*proto.Qual, opts types.Optio
345346
}
346347

347348
// GetPathKeys Is a method called from the planner to add additional Path to the planner.
348-
// By default, the planner generates an (unparameterized) path, which
349-
// can be reasoned about like a SequentialScan, optionally filtered.
350-
// This method allows the implementor to declare other Paths,
351-
// corresponding to faster access methods for specific attributes.
352-
// Such a parameterized path can be reasoned about like an IndexScan.
353-
// For example, with the following query::
354-
// select * from foreign_table inner join local_table using(id);
355-
// where foreign_table is a foreign table containing 100000 rows, and
356-
// local_table is a regular table containing 100 rows.
357-
// The previous query would probably be transformed to a plan similar to
358-
// this one::
359-
// ┌────────────────────────────────────────────────────────────────────────────────────┐
360-
// │ QUERY PLAN │
361-
// ├────────────────────────────────────────────────────────────────────────────────────┤
362-
// │ Hash Join (cost=57.67..4021812.67 rows=615000 width=68) │
363-
// │ Hash Cond: (foreign_table.id = local_table.id) │
364-
// │ -> Foreign Scan on foreign_table (cost=20.00..4000000.00 rows=100000 width=40) │
365-
// │ -> Hash (cost=22.30..22.30 rows=1230 width=36) │
366-
// │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │
367-
// └────────────────────────────────────────────────────────────────────────────────────┘
368-
// But with a parameterized path declared on the id key, with the knowledge that this key
369-
// is unique on the foreign side, the following plan might get chosen::
370-
// ┌───────────────────────────────────────────────────────────────────────┐
371-
// │ QUERY PLAN │
372-
// ├───────────────────────────────────────────────────────────────────────┤
373-
// │ Nested Loop (cost=20.00..49234.60 rows=615000 width=68) │
374-
// │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │
375-
// │ -> Foreign Scan on remote_table (cost=20.00..40.00 rows=1 width=40)│
376-
// │ Filter: (id = local_table.id) │
377-
// └───────────────────────────────────────────────────────────────────────┘
378-
// Returns:
379-
// A list of tuples of the form: (key_columns, expected_rows),
380-
// where key_columns is a tuple containing the columns on which
381-
// the path can be used, and expected_rows is the number of rows
382-
// this path might return for a simple lookup.
383-
// For example, the return value corresponding to the previous scenario would be::
384-
// [(('id',), 1)]
349+
//
350+
// By default, the planner generates an (unparameterized) path, which
351+
// can be reasoned about like a SequentialScan, optionally filtered.
352+
// This method allows the implementor to declare other Paths,
353+
// corresponding to faster access methods for specific attributes.
354+
// Such a parameterized path can be reasoned about like an IndexScan.
355+
// For example, with the following query::
356+
// select * from foreign_table inner join local_table using(id);
357+
// where foreign_table is a foreign table containing 100000 rows, and
358+
// local_table is a regular table containing 100 rows.
359+
// The previous query would probably be transformed to a plan similar to
360+
// this one::
361+
// ┌────────────────────────────────────────────────────────────────────────────────────┐
362+
// │ QUERY PLAN │
363+
// ├────────────────────────────────────────────────────────────────────────────────────┤
364+
// │ Hash Join (cost=57.67..4021812.67 rows=615000 width=68) │
365+
// │ Hash Cond: (foreign_table.id = local_table.id) │
366+
// │ -> Foreign Scan on foreign_table (cost=20.00..4000000.00 rows=100000 width=40) │
367+
// │ -> Hash (cost=22.30..22.30 rows=1230 width=36) │
368+
// │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │
369+
// └────────────────────────────────────────────────────────────────────────────────────┘
370+
// But with a parameterized path declared on the id key, with the knowledge that this key
371+
// is unique on the foreign side, the following plan might get chosen::
372+
// ┌───────────────────────────────────────────────────────────────────────┐
373+
// │ QUERY PLAN │
374+
// ├───────────────────────────────────────────────────────────────────────┤
375+
// │ Nested Loop (cost=20.00..49234.60 rows=615000 width=68) │
376+
// │ -> Seq Scan on local_table (cost=0.00..22.30 rows=1230 width=36) │
377+
// │ -> Foreign Scan on remote_table (cost=20.00..40.00 rows=1 width=40)│
378+
// │ Filter: (id = local_table.id) │
379+
// └───────────────────────────────────────────────────────────────────────┘
380+
// Returns:
381+
// A list of tuples of the form: (key_columns, expected_rows),
382+
// where key_columns is a tuple containing the columns on which
383+
// the path can be used, and expected_rows is the number of rows
384+
// this path might return for a simple lookup.
385+
// For example, the return value corresponding to the previous scenario would be::
386+
// [(('id',), 1)]
385387
func (h *Hub) GetPathKeys(opts types.Options) ([]types.PathKey, error) {
386388

387389
connectionName := opts["connection"]
@@ -440,8 +442,9 @@ func (h *Hub) GetPathKeys(opts types.Options) ([]types.PathKey, error) {
440442
}
441443

442444
// Explain :: hook called on explain.
443-
// Returns:
444-
// An iterable of strings to display in the EXPLAIN output.
445+
//
446+
// Returns:
447+
// An iterable of strings to display in the EXPLAIN output.
445448
func (h *Hub) Explain(columns []string, quals []*proto.Qual, sortKeys []string, verbose bool, opts types.Options) ([]string, error) {
446449
return make([]string, 0), nil
447450
}
@@ -715,12 +718,13 @@ func (h *Hub) getConnectionPlugin(connectionName string) (*steampipeconfig.Conne
715718
// get the plugin FQN
716719
connectionConfig, ok := steampipeconfig.GlobalConfig.Connections[connectionName]
717720
if !ok {
721+
log.Printf("[WARN] no connection config loaded for connection '%s'", connectionName)
718722
return nil, fmt.Errorf("no connection config loaded for connection '%s'", connectionName)
719723
}
720724
pluginFQN := connectionConfig.Plugin
721725

722726
// ask connection map to get or create this connection
723-
c, err := h.connections.createConnectionPlugin(pluginFQN, connectionName)
727+
c, err := h.connections.getOrCreate(pluginFQN, connectionName)
724728
if err != nil {
725729
return nil, err
726730
}

types/pathkeys.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,6 @@ func LegacyKeyColumnsToPathKeys(requiredColumns, optionalColumns *proto.KeyColum
111111

112112
// otherwise build paths based just on required columns
113113
return columnPathsToPathKeys(requiredColumnSets, allColumns, 1)
114-
115-
// TODO consider whether we need to add paths for required+optional+other columns as well??
116114
}
117115

118116
// LegacyKeyColumnsToColumnPaths returns a list of all the column sets to use in path keys

0 commit comments

Comments
 (0)