Skip to content

Commit 3772032

Browse files
committed
feat: Use of configurable concurrency for extracting assets from BigQuery
1 parent 4c67f9c commit 3772032

File tree

1 file changed

+35
-17
lines changed

1 file changed

+35
-17
lines changed

plugins/extractors/bigquery/bigquery.go

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"go.opentelemetry.io/otel"
2828
"go.opentelemetry.io/otel/attribute"
2929
"go.opentelemetry.io/otel/metric"
30+
"golang.org/x/sync/errgroup"
3031
"google.golang.org/api/iterator"
3132
"google.golang.org/api/option"
3233
"google.golang.org/protobuf/types/known/anypb"
@@ -56,6 +57,7 @@ type Config struct {
5657
UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"`
5758
UsageProjectIDs []string `mapstructure:"usage_project_ids"`
5859
BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"`
60+
Concurrency int `mapstructure:"concurrency" default:"10"`
5961
}
6062

6163
type Exclude struct {
@@ -122,6 +124,7 @@ type Extractor struct {
122124
policyTagClient *datacatalog.PolicyTagManagerClient
123125
newClient NewClientFunc
124126
randFn randFn
127+
eg *errgroup.Group
125128

126129
datasetsDurn metric.Int64Histogram
127130
tablesDurn metric.Int64Histogram
@@ -204,13 +207,17 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
204207
e.logger.Error("failed to create policy tag manager client", "err", err)
205208
}
206209

210+
e.eg = &errgroup.Group{}
211+
e.eg.SetLimit(e.config.Concurrency)
212+
207213
return nil
208214
}
209215

210216
// Extract checks if the table is valid and extracts the table schema
211217
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
212218
pageSize := pickFirstNonZero(e.config.DatasetPageSize, e.config.MaxPageSize, 10)
213219

220+
wg := sync.WaitGroup{}
214221
// Fetch and iterate over datasets
215222
pager := iterator.NewPager(e.client.Datasets(ctx), pageSize, "")
216223
for {
@@ -227,14 +234,24 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
227234
e.logger.Debug("excluding dataset from bigquery extract", "dataset_id", ds.DatasetID)
228235
continue
229236
}
230-
e.extractTable(ctx, ds, emit)
237+
wg.Add(1)
238+
go func(ds *bigquery.Dataset) {
239+
defer wg.Done()
240+
e.extractTable(ctx, ds, emit)
241+
}(ds)
231242
}
232243

233244
if !hasNext {
234245
break
235246
}
236247
}
237248

249+
wg.Wait()
250+
if err := e.eg.Wait(); err != nil {
251+
e.logger.Error("error extracting bigquery tables", "err", err)
252+
return err
253+
}
254+
238255
return nil
239256
}
240257

@@ -311,22 +328,23 @@ func (e *Extractor) extractTable(ctx context.Context, ds *bigquery.Dataset, emit
311328
continue
312329
}
313330

314-
tableFQN := table.FullyQualifiedName()
315-
316-
e.logger.Debug("extracting table", "table", tableFQN)
317-
tmd, err := e.fetchTableMetadata(ctx, table)
318-
if err != nil {
319-
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
320-
continue
321-
}
322-
323-
asset, err := e.buildAsset(ctx, table, tmd)
324-
if err != nil {
325-
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
326-
continue
327-
}
328-
329-
emit(models.NewRecord(asset))
331+
table := table
332+
e.eg.Go(func() error {
333+
tableFQN := table.FullyQualifiedName()
334+
e.logger.Debug("extracting table", "table", tableFQN)
335+
tmd, err := e.fetchTableMetadata(ctx, table)
336+
if err != nil {
337+
e.logger.Error("failed to fetch table metadata", "err", err, "table", tableFQN)
338+
return nil
339+
}
340+
asset, err := e.buildAsset(ctx, table, tmd)
341+
if err != nil {
342+
e.logger.Error("failed to build asset", "err", err, "table", tableFQN)
343+
return nil
344+
}
345+
emit(models.NewRecord(asset))
346+
return nil
347+
})
330348
}
331349

332350
if !hasNext {

0 commit comments

Comments
 (0)