@@ -11,11 +11,12 @@ import (
1111 "sync"
1212
1313 "github.com/joelanford/ignore"
14- "github.com/operator-framework/api/pkg/operators"
1514 "golang.org/x/sync/errgroup"
1615 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1716 "k8s.io/apimachinery/pkg/util/yaml"
1817
18+ "github.com/operator-framework/api/pkg/operators"
19+
1920 "github.com/operator-framework/operator-registry/alpha/property"
2021)
2122
@@ -25,22 +26,42 @@ const (
2526
2627type WalkMetasFSFunc func (path string , meta * Meta , err error ) error
2728
28- func WalkMetasFS (root fs.FS , walkFn WalkMetasFSFunc ) error {
29- return walkFiles (root , func (root fs.FS , path string , err error ) error {
30- if err != nil {
31- return walkFn (path , nil , err )
32- }
29+ // WalkMetasFS walks the filesystem rooted at root and calls walkFn for each individual meta object found in the root.
30+ // By default, WalkMetasFS is not thread-safe because it invokes walkFn concurrently. In order to make it thread-safe,
31+ // use the WithConcurrency(1) to avoid concurrent invocations of walkFn.
32+ func WalkMetasFS (ctx context.Context , root fs.FS , walkFn WalkMetasFSFunc , opts ... LoadOption ) error {
33+ if root == nil {
34+ return fmt .Errorf ("no declarative config filesystem provided" )
35+ }
3336
34- f , err := root .Open (path )
35- if err != nil {
36- return walkFn (path , nil , err )
37- }
38- defer f .Close ()
37+ options := LoadOptions {
38+ concurrency : runtime .NumCPU (),
39+ }
40+ for _ , opt := range opts {
41+ opt (& options )
42+ }
3943
40- return WalkMetasReader (f , func (meta * Meta , err error ) error {
41- return walkFn (path , meta , err )
42- })
44+ pathChan := make (chan string , options .concurrency )
45+
46+ // Create an errgroup to manage goroutines. The context is closed when any
47+ // goroutine returns an error. Goroutines should check the context
48+ // to see if they should return early (in the case of another goroutine
49+ // returning an error).
50+ eg , ctx := errgroup .WithContext (ctx )
51+
52+ // Walk the FS and send paths to a channel for parsing.
53+ eg .Go (func () error {
54+ return sendPaths (ctx , root , pathChan )
4355 })
56+
57+ // Parse paths concurrently. The waitgroup ensures that all paths are parsed
58+ // before the cfgChan is closed.
59+ for i := 0 ; i < options .concurrency ; i ++ {
60+ eg .Go (func () error {
61+ return parseMetaPaths (ctx , root , pathChan , walkFn , options )
62+ })
63+ }
64+ return eg .Wait ()
4465}
4566
4667type WalkMetasReaderFunc func (meta * Meta , err error ) error
@@ -126,59 +147,16 @@ func WithConcurrency(concurrency int) LoadOption {
126147// If LoadFS encounters an error loading or parsing any file, the error will be
127148// immediately returned.
128149func LoadFS (ctx context.Context , root fs.FS , opts ... LoadOption ) (* DeclarativeConfig , error ) {
129- if root == nil {
130- return nil , fmt .Errorf ("no declarative config filesystem provided" )
131- }
132-
133- options := LoadOptions {
134- concurrency : runtime .NumCPU (),
135- }
136- for _ , opt := range opts {
137- opt (& options )
138- }
139-
140- var (
141- fcfg = & DeclarativeConfig {}
142- pathChan = make (chan string , options .concurrency )
143- cfgChan = make (chan * DeclarativeConfig , options .concurrency )
144- )
145-
146- // Create an errgroup to manage goroutines. The context is closed when any
147- // goroutine returns an error. Goroutines should check the context
148- // to see if they should return early (in the case of another goroutine
149- // returning an error).
150- eg , ctx := errgroup .WithContext (ctx )
151-
152- // Walk the FS and send paths to a channel for parsing.
153- eg .Go (func () error {
154- return sendPaths (ctx , root , pathChan )
155- })
156-
157- // Parse paths concurrently. The waitgroup ensures that all paths are parsed
158- // before the cfgChan is closed.
159- var wg sync.WaitGroup
160- for i := 0 ; i < options .concurrency ; i ++ {
161- wg .Add (1 )
162- eg .Go (func () error {
163- defer wg .Done ()
164- return parsePaths (ctx , root , pathChan , cfgChan )
165- })
166- }
167-
168- // Merge parsed configs into a single config.
169- eg .Go (func () error {
170- return mergeCfgs (ctx , cfgChan , fcfg )
171- })
172-
173- // Wait for all path parsing goroutines to finish before closing cfgChan.
174- wg .Wait ()
175- close (cfgChan )
176-
177- // Wait for all goroutines to finish.
178- if err := eg .Wait (); err != nil {
150+ builder := fbcBuilder {}
151+ if err := WalkMetasFS (ctx , root , func (path string , meta * Meta , err error ) error {
152+ if err != nil {
153+ return err
154+ }
155+ return builder .addMeta (meta )
156+ }, opts ... ); err != nil {
179157 return nil , err
180158 }
181- return fcfg , nil
159+ return & builder . cfg , nil
182160}
183161
184162func sendPaths (ctx context.Context , root fs.FS , pathChan chan <- string ) error {
@@ -196,7 +174,7 @@ func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
196174 })
197175}
198176
199- func parsePaths (ctx context.Context , root fs.FS , pathChan <- chan string , cfgChan chan <- * DeclarativeConfig ) error {
177+ func parseMetaPaths (ctx context.Context , root fs.FS , pathChan <- chan string , walkFn WalkMetasFSFunc , options LoadOptions ) error {
200178 for {
201179 select {
202180 case <- ctx .Done (): // don't block on receiving from pathChan
@@ -205,51 +183,35 @@ func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan
205183 if ! ok {
206184 return nil
207185 }
208- cfg , err := LoadFile ( root , path )
186+ file , err := root . Open ( path )
209187 if err != nil {
210188 return err
211189 }
212- select {
213- case cfgChan <- cfg :
214- case <- ctx . Done (): // don't block on sending to cfgChan
215- return ctx . Err ()
190+ if err := WalkMetasReader ( file , func ( meta * Meta , err error ) error {
191+ return walkFn ( path , meta , err )
192+ }); err != nil {
193+ return err
216194 }
217195 }
218196 }
219197}
220198
221- func mergeCfgs (ctx context.Context , cfgChan <- chan * DeclarativeConfig , fcfg * DeclarativeConfig ) error {
222- for {
223- select {
224- case <- ctx .Done (): // don't block on receiving from cfgChan
225- return ctx .Err ()
226- case cfg , ok := <- cfgChan :
227- if ! ok {
228- return nil
229- }
230- fcfg .Merge (cfg )
199+ func readBundleObjects (b * Bundle ) error {
200+ var obj property.BundleObject
201+ for i , props := range b .Properties {
202+ if props .Type != property .TypeBundleObject {
203+ continue
231204 }
232- }
233- }
234-
235- func readBundleObjects (bundles []Bundle ) error {
236- for bi , b := range bundles {
237- var obj property.BundleObject
238- for i , props := range b .Properties {
239- if props .Type != property .TypeBundleObject {
240- continue
241- }
242- if err := json .Unmarshal (props .Value , & obj ); err != nil {
243- return fmt .Errorf ("package %q, bundle %q: parse property at index %d as bundle object: %v" , b .Package , b .Name , i , err )
244- }
245- objJson , err := yaml .ToJSON (obj .Data )
246- if err != nil {
247- return fmt .Errorf ("package %q, bundle %q: convert bundle object property at index %d to JSON: %v" , b .Package , b .Name , i , err )
248- }
249- bundles [bi ].Objects = append (bundles [bi ].Objects , string (objJson ))
205+ if err := json .Unmarshal (props .Value , & obj ); err != nil {
206+ return fmt .Errorf ("package %q, bundle %q: parse property at index %d as bundle object: %v" , b .Package , b .Name , i , err )
250207 }
251- bundles [bi ].CsvJSON = extractCSV (bundles [bi ].Objects )
208+ objJson , err := yaml .ToJSON (obj .Data )
209+ if err != nil {
210+ return fmt .Errorf ("package %q, bundle %q: convert bundle object property at index %d to JSON: %v" , b .Package , b .Name , i , err )
211+ }
212+ b .Objects = append (b .Objects , string (objJson ))
252213 }
214+ b .CsvJSON = extractCSV (b .Objects )
253215 return nil
254216}
255217
@@ -268,52 +230,16 @@ func extractCSV(objs []string) string {
268230
269231// LoadReader reads yaml or json from the passed in io.Reader and unmarshals it into a DeclarativeConfig struct.
270232func LoadReader (r io.Reader ) (* DeclarativeConfig , error ) {
271- cfg := & DeclarativeConfig {}
272-
273- if err := WalkMetasReader (r , func (in * Meta , err error ) error {
233+ builder := fbcBuilder {}
234+ if err := WalkMetasReader (r , func (meta * Meta , err error ) error {
274235 if err != nil {
275236 return err
276237 }
277- switch in .Schema {
278- case SchemaPackage :
279- var p Package
280- if err := json .Unmarshal (in .Blob , & p ); err != nil {
281- return fmt .Errorf ("parse package: %v" , err )
282- }
283- cfg .Packages = append (cfg .Packages , p )
284- case SchemaChannel :
285- var c Channel
286- if err := json .Unmarshal (in .Blob , & c ); err != nil {
287- return fmt .Errorf ("parse channel: %v" , err )
288- }
289- cfg .Channels = append (cfg .Channels , c )
290- case SchemaBundle :
291- var b Bundle
292- if err := json .Unmarshal (in .Blob , & b ); err != nil {
293- return fmt .Errorf ("parse bundle: %v" , err )
294- }
295- cfg .Bundles = append (cfg .Bundles , b )
296- case SchemaDeprecation :
297- var d Deprecation
298- if err := json .Unmarshal (in .Blob , & d ); err != nil {
299- return fmt .Errorf ("parse deprecation: %w" , err )
300- }
301- cfg .Deprecations = append (cfg .Deprecations , d )
302- case "" :
303- return fmt .Errorf ("object '%s' is missing root schema field" , string (in .Blob ))
304- default :
305- cfg .Others = append (cfg .Others , * in )
306- }
307- return nil
238+ return builder .addMeta (meta )
308239 }); err != nil {
309240 return nil , err
310241 }
311-
312- if err := readBundleObjects (cfg .Bundles ); err != nil {
313- return nil , fmt .Errorf ("read bundle objects: %v" , err )
314- }
315-
316- return cfg , nil
242+ return & builder .cfg , nil
317243}
318244
319245// LoadFile will unmarshall declarative config components from a single filename provided in 'path'
@@ -332,3 +258,60 @@ func LoadFile(root fs.FS, path string) (*DeclarativeConfig, error) {
332258
333259 return cfg , nil
334260}
261+
262+ type fbcBuilder struct {
263+ cfg DeclarativeConfig
264+
265+ packagesMu sync.Mutex
266+ channelsMu sync.Mutex
267+ bundlesMu sync.Mutex
268+ deprecationsMu sync.Mutex
269+ othersMu sync.Mutex
270+ }
271+
272+ func (c * fbcBuilder ) addMeta (in * Meta ) error {
273+ switch in .Schema {
274+ case SchemaPackage :
275+ var p Package
276+ if err := json .Unmarshal (in .Blob , & p ); err != nil {
277+ return fmt .Errorf ("parse package: %v" , err )
278+ }
279+ c .packagesMu .Lock ()
280+ c .cfg .Packages = append (c .cfg .Packages , p )
281+ c .packagesMu .Unlock ()
282+ case SchemaChannel :
283+ var ch Channel
284+ if err := json .Unmarshal (in .Blob , & ch ); err != nil {
285+ return fmt .Errorf ("parse channel: %v" , err )
286+ }
287+ c .channelsMu .Lock ()
288+ c .cfg .Channels = append (c .cfg .Channels , ch )
289+ c .channelsMu .Unlock ()
290+ case SchemaBundle :
291+ var b Bundle
292+ if err := json .Unmarshal (in .Blob , & b ); err != nil {
293+ return fmt .Errorf ("parse bundle: %v" , err )
294+ }
295+ if err := readBundleObjects (& b ); err != nil {
296+ return fmt .Errorf ("read bundle objects: %v" , err )
297+ }
298+ c .bundlesMu .Lock ()
299+ c .cfg .Bundles = append (c .cfg .Bundles , b )
300+ c .bundlesMu .Unlock ()
301+ case SchemaDeprecation :
302+ var d Deprecation
303+ if err := json .Unmarshal (in .Blob , & d ); err != nil {
304+ return fmt .Errorf ("parse deprecation: %w" , err )
305+ }
306+ c .deprecationsMu .Lock ()
307+ c .cfg .Deprecations = append (c .cfg .Deprecations , d )
308+ c .deprecationsMu .Unlock ()
309+ case "" :
310+ return fmt .Errorf ("object '%s' is missing root schema field" , string (in .Blob ))
311+ default :
312+ c .othersMu .Lock ()
313+ c .cfg .Others = append (c .cfg .Others , * in )
314+ c .othersMu .Unlock ()
315+ }
316+ return nil
317+ }
0 commit comments