diff --git a/api/v1alpha/conduit_types.go b/api/v1alpha/conduit_types.go index 72521c3..8cce6fb 100644 --- a/api/v1alpha/conduit_types.go +++ b/api/v1alpha/conduit_types.go @@ -91,11 +91,12 @@ type ConduitConnector struct { } type ConduitProcessor struct { - ID string `json:"id,omitempty"` - Name string `json:"name,omitempty"` - Plugin string `json:"plugin,omitempty"` - Condition string `json:"condition,omitempty"` - Workers int `json:"workers,omitempty"` + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Plugin string `json:"plugin,omitempty"` + ProcessorURL string `json:"processorURL,omitempty"` + Condition string `json:"condition,omitempty"` + Workers int `json:"workers,omitempty"` Settings []SettingsVar `json:"settings,omitempty"` } diff --git a/charts/conduit-operator/templates/crd.yaml b/charts/conduit-operator/templates/crd.yaml index ab0eb71..347e47d 100644 --- a/charts/conduit-operator/templates/crd.yaml +++ b/charts/conduit-operator/templates/crd.yaml @@ -76,6 +76,8 @@ spec: type: string plugin: type: string + processorURL: + type: string settings: items: properties: @@ -188,6 +190,8 @@ spec: type: string plugin: type: string + processorURL: + type: string settings: items: properties: diff --git a/config/crd/bases/operator.conduit.io_conduits.yaml b/config/crd/bases/operator.conduit.io_conduits.yaml index 5c9c309..ab22a58 100644 --- a/config/crd/bases/operator.conduit.io_conduits.yaml +++ b/config/crd/bases/operator.conduit.io_conduits.yaml @@ -65,6 +65,8 @@ spec: type: string plugin: type: string + processorURL: + type: string settings: items: properties: @@ -177,6 +179,8 @@ spec: type: string plugin: type: string + processorURL: + type: string settings: items: properties: diff --git a/internal/testutil/conduit.go b/internal/testutil/conduit.go index a36fa27..b0aaee2 100644 --- a/internal/testutil/conduit.go +++ b/internal/testutil/conduit.go @@ -22,6 +22,9 @@ func SetupSampleConduit(t *testing.T) *v1alpha.Conduit { Name: "my-pipeline", Version: "v0.13.2", Description: "my-description", + Registry: &v1alpha.SchemaRegistry{ + URL: "http://apicurio:8080/apis/ccompat/v7", + }, Connectors: []*v1alpha.ConduitConnector{ { Name: "source-connector", @@ -60,11 +63,12 @@ func SetupSampleConduit(t *testing.T) *v1alpha.Conduit { }, Processors: []*v1alpha.ConduitProcessor{ { - ID: "proc1", - Name: "proc1", - Plugin: "builtin:base64.encode", - Workers: 2, - Condition: "{{ eq .Metadata.key \"pipeline\" }}", + ID: "proc1", + Name: "proc1", + Plugin: "builtin:base64.encode", + Workers: 2, + Condition: "{{ eq .Metadata.key \"pipeline\" }}", + ProcessorURL: "http://127.0.0.1:8090/api/files/processors/RECORD_ID/FILENAME", Settings: []v1alpha.SettingsVar{ { Name: "setting01", @@ -103,6 +107,9 @@ func SetupBadNameConduit(t *testing.T) *v1alpha.Conduit { Name: "my-pipeline", Description: "my-description", Version: "v0.13.2", + Registry: &v1alpha.SchemaRegistry{ + URL: "http://apicurio:8080/apis/ccompat/v7", + }, Connectors: []*v1alpha.ConduitConnector{ { Name: "source-connector", @@ -157,6 +164,9 @@ func SetupBadValidationConduit(t *testing.T) *v1alpha.Conduit { Image: "ghcr.io/conduitio/conduit", Description: "my-description", Version: "v0.13.2", + Registry: &v1alpha.SchemaRegistry{ + URL: "http://apicurio:8080/apis/ccompat/v7", + }, Connectors: []*v1alpha.ConduitConnector{ { Name: "source-connector", @@ -208,6 +218,9 @@ func SetupSecretConduit(t *testing.T) *v1alpha.Conduit { Name: "my-pipeline", Version: "v0.13.2", Description: "my-description", + Registry: &v1alpha.SchemaRegistry{ + URL: "http://apicurio:8080/apis/ccompat/v7", + }, Connectors: []*v1alpha.ConduitConnector{ { Name: "source-connector", @@ -258,3 +271,88 @@ func SetupSecretConduit(t *testing.T) *v1alpha.Conduit { return c } + +func SetupSourceProcConduit(t *testing.T) *v1alpha.Conduit { + t.Helper() + running := true + + c := &v1alpha.Conduit{ + ObjectMeta: metav1.ObjectMeta{ + Name: "sample", + Namespace: "sample", + }, + Spec: v1alpha.ConduitSpec{ + Running: &running, + Name: "my-pipeline", + Version: "v0.13.2", + Description: "my-description", + Registry: &v1alpha.SchemaRegistry{ + URL: "http://apicurio:8080/apis/ccompat/v7", + }, + Connectors: []*v1alpha.ConduitConnector{ + { + Name: "source-connector", + Type: "source", + Plugin: "builtin:generator", + PluginVersion: "latest", + PluginName: "builtin:generator", + Settings: []v1alpha.SettingsVar{ + { + Name: "servers", + Value: "127.0.0.1", + }, + { + Name: "topics", + Value: "input-topic", + }, + }, + Processors: []*v1alpha.ConduitProcessor{ + { + ID: "proc1", + Name: "proc1", + Plugin: "standalone:foo.encode", + Workers: 2, + Condition: "{{ eq .Metadata.key \"pipeline\" }}", + ProcessorURL: "http://127.0.0.1:8090/api/files/processors/RECORD_ID/FILENAME", + Settings: []v1alpha.SettingsVar{ + { + Name: "setting01", + SecretRef: &corev1.SecretKeySelector{ + Key: "setting01-%p-key", + LocalObjectReference: corev1.LocalObjectReference{ + Name: "setting01-secret-name", + }, + }, + }, + { + Name: "setting02", + Value: "setting02-val", + }, + }, + }, + }, + }, + { + Name: "destination-connector", + Type: "destination", + Plugin: "builtin:log", + PluginVersion: "latest", + PluginName: "builtin:log", + Settings: []v1alpha.SettingsVar{ + { + Name: "servers", + Value: "127.0.0.1", + }, + { + Name: "topic", + Value: "output-topic", + }, + }, + }, + }, + Processors: nil, + }, + } + + return c +} diff --git a/internal/webhook/v1alpha/conduit_webhook.go b/internal/webhook/v1alpha/conduit_webhook.go index 3b30a81..a1cc0d5 100644 --- a/internal/webhook/v1alpha/conduit_webhook.go +++ b/internal/webhook/v1alpha/conduit_webhook.go @@ -35,6 +35,12 @@ import ( "github.com/Masterminds/semver/v3" v1alpha "github.com/conduitio/conduit-operator/api/v1alpha" validation "github.com/conduitio/conduit-operator/pkg/conduit" + conduitLog "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/plugin/processor/procutils" + "github.com/conduitio/conduit/pkg/plugin/processor/standalone" + "github.com/conduitio/conduit/pkg/schemaregistry" + "github.com/rs/zerolog" + "github.com/twmb/franz-go/pkg/sr" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -175,21 +181,28 @@ func (v *ConduitCustomValidator) ValidateCreate(ctx context.Context, obj runtime errs = append(errs, err) } - if verrs := v.validateConnectors(ctx, conduit.Spec.Connectors); len(verrs) > 0 { + if err := v.validateRegistry(conduit.Spec.Registry); err != nil { + errs = append(errs, err) + } + + r, err := registry(conduit.Spec.Registry, field.NewPath("schemaregistry")) + if err != nil { + errs = append(errs, err) + } + + if verrs := v.validateConnectors(ctx, conduit.Spec.Connectors, r); len(verrs) > 0 { errs = append(errs, verrs...) } if verrs := v.validateProcessors( + ctx, conduit.Spec.Processors, + r, field.NewPath("spec").Child("processors"), ); len(verrs) > 0 { errs = append(errs, verrs...) } - if err := v.validateRegistry(conduit.Spec.Registry); err != nil { - errs = append(errs, err) - } - if len(errs) > 0 { return nil, apierrors.NewInvalid(v1alpha.GroupKind, conduit.Name, errs) } @@ -204,7 +217,16 @@ func (v *ConduitCustomValidator) ValidateUpdate(ctx context.Context, _, newObj r return nil, fmt.Errorf("expected a Conduit object for the newObj but got %T", newObj) } - if errs := v.validateConnectors(ctx, conduit.Spec.Connectors); len(errs) > 0 { + r, err := registry(conduit.Spec.Registry, field.NewPath("schemaregistry")) + if err != nil { + return nil, fmt.Errorf("failed creating schema registry %w", err) + } + + if errs := v.validateConnectors(ctx, conduit.Spec.Connectors, r); len(errs) > 0 { + return nil, apierrors.NewInvalid(v1alpha.GroupKind, conduit.Name, errs) + } + + if errs := v.validateProcessors(ctx, conduit.Spec.Processors, r, field.NewPath("spec").Child("processors")); len(errs) > 0 { return nil, apierrors.NewInvalid(v1alpha.GroupKind, conduit.Name, errs) } @@ -222,7 +244,7 @@ func (v *ConduitCustomValidator) ValidateDelete(_ context.Context, obj runtime.O // validateConnectors validates the attributes of connectors in the slice. // Error is return when the validation fails. -func (v *ConduitCustomValidator) validateConnectors(ctx context.Context, cc []*v1alpha.ConduitConnector) field.ErrorList { +func (v *ConduitCustomValidator) validateConnectors(ctx context.Context, cc []*v1alpha.ConduitConnector, r validation.PluginRegistry) field.ErrorList { var errs field.ErrorList fp := field.NewPath("spec").Child("connectors") @@ -231,7 +253,7 @@ func (v *ConduitCustomValidator) validateConnectors(ctx context.Context, cc []*v errs = append(errs, err) } - if procErrs := v.validateProcessors(c.Processors, fp); procErrs != nil { + if procErrs := v.validateProcessors(ctx, c.Processors, r, fp); procErrs != nil { errs = append(errs, procErrs...) } } @@ -243,11 +265,11 @@ func (v *ConduitCustomValidator) validateConnectors(ctx context.Context, cc []*v return nil } -func (v *ConduitCustomValidator) validateProcessors(pp []*v1alpha.ConduitProcessor, fp *field.Path) field.ErrorList { +func (v *ConduitCustomValidator) validateProcessors(ctx context.Context, pp []*v1alpha.ConduitProcessor, r validation.PluginRegistry, fp *field.Path) field.ErrorList { var errs field.ErrorList for _, p := range pp { - if err := v.ValidateProcessorPlugin(p, fp); err != nil { + if err := v.ValidateProcessor(ctx, p, r, fp); err != nil { errs = append(errs, err) } } @@ -277,6 +299,30 @@ func (*ConduitCustomValidator) validateConduitVersion(ver string) *field.Error { return nil } +var registryFactory = func(r *v1alpha.SchemaRegistry, fp *field.Path) (validation.PluginRegistry, *field.Error) { + cl, err := schemaregistry.NewClient(conduitLog.Nop(), sr.URLs(r.URL)) + if err != nil { + return nil, field.Invalid(fp, sr.URLs(r.URL), fmt.Sprintf("failed to create schema registry: %s", err)) + } + + conduitLogger := conduitLog.InitLogger(zerolog.DebugLevel, conduitLog.FormatJSON) + procSchemaService := procutils.NewSchemaService(conduitLogger, cl) + reg, err := standalone.NewRegistry(conduitLogger, "/tmp", procSchemaService) + if err != nil { + return nil, field.InternalError(fp.Child("schema"), fmt.Errorf("failed to create standalone registry: %w", err)) + } + + return reg, nil +} + +func registry(r *v1alpha.SchemaRegistry, fp *field.Path) (validation.PluginRegistry, *field.Error) { + if r == nil || r.URL == "" { + return nil, field.InternalError(fp, fmt.Errorf("registry must be set")) + } + + return registryFactory(r, fp) +} + func (*ConduitCustomValidator) validateRegistry(sr *v1alpha.SchemaRegistry) *field.Error { if sr == nil || sr.URL == "" { return nil diff --git a/internal/webhook/v1alpha/conduit_webhook_test.go b/internal/webhook/v1alpha/conduit_webhook_test.go index 6e2ec02..72f8240 100644 --- a/internal/webhook/v1alpha/conduit_webhook_test.go +++ b/internal/webhook/v1alpha/conduit_webhook_test.go @@ -9,6 +9,8 @@ import ( v1alpha "github.com/conduitio/conduit-operator/api/v1alpha" "github.com/conduitio/conduit-operator/internal/testutil" "github.com/conduitio/conduit-operator/pkg/conduit" + "github.com/conduitio/conduit-operator/pkg/conduit/mock" + "github.com/conduitio/conduit/pkg/plugin" "github.com/golang/mock/gomock" "github.com/matryer/is" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -54,6 +56,9 @@ func TestWebhookValidate_ConduitVersion(t *testing.T) { } func TestWebhook_ValidateCreate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + tests := []struct { name string setup func() *v1alpha.Conduit @@ -62,27 +67,99 @@ func TestWebhook_ValidateCreate(t *testing.T) { { name: "validation is successful", setup: func() *v1alpha.Conduit { + c := testutil.SetupSampleConduit(t) webClient := conduit.SetupHTTPMockClient(t) httpResps := conduit.GetHTTPResps(t) gomock.InOrder( webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), ) - return testutil.SetupSampleConduit(t) + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } + + return c + }, + }, + { + name: "validates pipeline with source processors", + setup: func() *v1alpha.Conduit { + webClient := conduit.SetupHTTPMockClient(t) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + ) + + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } + + return testutil.SetupSourceProcConduit(t) }, }, { - name: "error occurs on http call", + name: "error occurs for connector params", setup: func() *v1alpha.Conduit { webClient := conduit.SetupHTTPMockClient(t) - webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")).Times(4) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } return testutil.SetupSampleConduit(t) }, wantErr: nil, }, + { + name: "error occurs for standalone processor schema", + setup: func() *v1alpha.Conduit { + webClient := conduit.SetupHTTPMockClient(t) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")), + ) + + return testutil.SetupSampleConduit(t) + }, + wantErr: apierrors.NewInvalid(v1alpha.GroupKind, "sample", field.ErrorList{ + field.InternalError( + field.NewPath("spec", "processors", "standalone"), + fmt.Errorf("failed to save wasm to file: BOOM"), + ), + }), + }, { name: "error occurs during validation", setup: func() *v1alpha.Conduit { @@ -92,6 +169,7 @@ func TestWebhook_ValidateCreate(t *testing.T) { webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), ) return testutil.SetupBadValidationConduit(t) @@ -129,6 +207,9 @@ func TestWebhook_ValidateCreate(t *testing.T) { } func TestWebhook_ValidateUpdate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + tests := []struct { name string setup func() *v1alpha.Conduit @@ -143,23 +224,119 @@ func TestWebhook_ValidateUpdate(t *testing.T) { webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpFnResps["list"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpFnResps["spec"]), webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpFnResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpFnResps["wasm"]), ) + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } + return testutil.SetupSampleConduit(t) }, }, { - name: "error occurs on http call", + name: "validates pipeline with source processors", setup: func() *v1alpha.Conduit { webClient := conduit.SetupHTTPMockClient(t) - webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")).Times(4) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + ) + + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } + + return testutil.SetupSourceProcConduit(t) + }, + }, + { + name: "error occurs on http call in connector schema validation", + setup: func() *v1alpha.Conduit { + webClient := conduit.SetupHTTPMockClient(t) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } return testutil.SetupSampleConduit(t) }, wantErr: nil, }, { - name: "error occurs during validation", + name: "error occurs during standalone proc registration", + setup: func() *v1alpha.Conduit { + webClient := conduit.SetupHTTPMockClient(t) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, "name", "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, plugin.ErrPluginAlreadyRegistered). + Times(1) + registryFactory = func(_ *v1alpha.SchemaRegistry, _ *field.Path) (conduit.PluginRegistry, *field.Error) { + return mockRegistry, nil + } + + return testutil.SetupSampleConduit(t) + }, + }, + { + name: "error occurs for connector processor validation", + setup: func() *v1alpha.Conduit { + webClient := conduit.SetupHTTPMockClient(t) + httpResps := conduit.GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["spec"]), + ) + + return testutil.SetupSourceProcConduit(t) + }, + wantErr: apierrors.NewInvalid(v1alpha.GroupKind, "sample", field.ErrorList{ + field.InternalError( + field.NewPath("spec", "connectors", "standalone"), + fmt.Errorf("failed to save wasm to file: BOOM"), + ), + }), + }, + { + name: "error occurs during connector param validation", setup: func() *v1alpha.Conduit { webClient := conduit.SetupHTTPMockClient(t) httpFnResps := conduit.GetHTTPResps(t) diff --git a/pkg/conduit/conduit_validator.go b/pkg/conduit/conduit_validator.go index e91e846..edabdc1 100644 --- a/pkg/conduit/conduit_validator.go +++ b/pkg/conduit/conduit_validator.go @@ -1,4 +1,5 @@ -//go:generate mockgen --build_flags=--mod=mod -source=./conduit_validator.go -destination=mock/http_client_mock.go -package=mock +//go:generate mockgen --build_flags=--mod=mod -source=./http_client.go -destination=mock/http_client_mock.go -package=mock +//go:generate mockgen --build_flags=--mod=mod -source=./plugin_registry.go -destination=mock/registry_mock.go -package=mock package conduit @@ -8,13 +9,17 @@ import ( "errors" "fmt" "io" + "log" "net/http" + "os" "slices" "strings" "github.com/conduitio/conduit-commons/config" sdk "github.com/conduitio/conduit-connector-sdk" v1alpha "github.com/conduitio/conduit-operator/api/v1alpha" + "github.com/conduitio/conduit/pkg/plugin" + "github.com/conduitio/conduit/pkg/plugin/processor/standalone" pconfig "github.com/conduitio/conduit/pkg/provisioning/config" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -30,9 +35,7 @@ const ( var HTTPClient httpClient = http.DefaultClient -type httpClient interface { - Do(req *http.Request) (*http.Response, error) -} +var _ PluginRegistry = (*standalone.Registry)(nil) var _ ValidatorService = (*Validator)(nil) @@ -93,13 +96,49 @@ func (v *Validator) ValidateConnector(ctx context.Context, c *v1alpha.ConduitCon return nil } -func (v *Validator) ValidateProcessorPlugin(p *v1alpha.ConduitProcessor, fp *field.Path) *field.Error { +func (v *Validator) ValidateProcessor(ctx context.Context, p *v1alpha.ConduitProcessor, reg PluginRegistry, fp *field.Path) *field.Error { + if err := v.validateProcessorPlugin(p, fp); err != nil { + return err + } + + if err := v.validateStandaloneProcessor(ctx, p, reg, fp); err != nil { + return err + } + + return nil +} + +func (v *Validator) validateProcessorPlugin(p *v1alpha.ConduitProcessor, fp *field.Path) *field.Error { if p.Plugin == "" { return field.Required(fp.Child("plugin"), "plugin cannot be empty") } return nil } +func (v *Validator) validateStandaloneProcessor(ctx context.Context, p *v1alpha.ConduitProcessor, reg PluginRegistry, fp *field.Path) *field.Error { + if p.ProcessorURL == "" { + return nil + } + + file, cleanup, err := pluginWASM(ctx, p.ProcessorURL) + if err != nil { + return field.InternalError(fp.Child("standalone"), fmt.Errorf("failed to save wasm to file: %w", err)) + } + if cleanup != nil { + defer cleanup() + } + + _, err = reg.Register(ctx, file) + if err != nil { + if errors.Is(err, plugin.ErrPluginAlreadyRegistered) { + return nil + } + return field.InternalError(fp.Child("standalone"), fmt.Errorf("failed to register: %w", err)) + } + + return nil +} + func (v *Validator) validateConnectorPlugin(c *v1alpha.ConduitConnector, fp *field.Path) *field.Error { if err := ValidatePlugin(c.Plugin); err != nil { return field.Invalid(fp.Child("plugin"), c.Plugin, err.Error()) @@ -209,7 +248,7 @@ func (v *Validator) fetchYAMLSpec(ctx context.Context, c *v1alpha.ConduitConnect // formatPluginName converts the plugin name into the format // "conduit-connector-connectorName" -// Returns the github organization and the transformed connector name +// Returns the transformed connector name func formatPluginName(pn string) (string, error) { parts := strings.Split(strings.TrimPrefix(strings.ToLower(pn), "github.com/"), "/") @@ -230,6 +269,43 @@ func formatPluginName(pn string) (string, error) { return "", nil } +// pluginWASM gets the processor WASM from the specified URL and saves to a temp +// file. +// Returns the filename of the created file and a cleanup function fo the file +func pluginWASM(ctx context.Context, processorURL string) (string, func(), error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, processorURL, nil) + if err != nil { + return "", nil, err + } + + resp, err := HTTPClient.Do(req) + if err != nil { + return "", nil, err + } + if resp.StatusCode != http.StatusOK { + return "", nil, fmt.Errorf("non-sucessful status code while getting processor WASM, status code: %d", resp.StatusCode) + } + defer resp.Body.Close() + + file, err := os.CreateTemp("", "proc-*.wasm") + if err != nil { + return "", nil, err + } + defer func() { + file.Close() + }() + + if _, err = io.Copy(file, resp.Body); err != nil { + return "", nil, err + } + + return file.Name(), func() { + if err := os.Remove(file.Name()); err != nil { + log.Println(err) + } + }, nil +} + // connectorList constructs a dictionary of connectors with information for // use by the validator. Skips any connectors with improper name formmatting or // are not in allowed orgs. @@ -244,7 +320,7 @@ func connectorList(ctx context.Context) (map[string]PluginInfo, error) { return nil, err } if resp.StatusCode != http.StatusOK { - return nil, err + return nil, fmt.Errorf("getting connector list, status code: %d", resp.StatusCode) } defer resp.Body.Close() diff --git a/pkg/conduit/conduit_validator_test.go b/pkg/conduit/conduit_validator_test.go index 8ecfd5c..3aaf121 100644 --- a/pkg/conduit/conduit_validator_test.go +++ b/pkg/conduit/conduit_validator_test.go @@ -11,6 +11,8 @@ import ( v1alpha "github.com/conduitio/conduit-operator/api/v1alpha" "github.com/conduitio/conduit-operator/internal/testutil" + "github.com/conduitio/conduit-operator/pkg/conduit/mock" + "github.com/conduitio/conduit/pkg/plugin" "github.com/go-logr/logr/testr" "github.com/golang/mock/gomock" "github.com/matryer/is" @@ -22,6 +24,30 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +var standaloneProc = v1alpha.ConduitProcessor{ + ID: "proc1", + Name: "proc1", + Plugin: "builtin:base64.encode", + Workers: 2, + Condition: "{{ eq .Metadata.key \"pipeline\" }}", + ProcessorURL: "http://127.0.0.1:8090/api/files/processors/RECORD_ID/FILENAME", + Settings: []v1alpha.SettingsVar{ + { + Name: "setting01", + SecretRef: &corev1.SecretKeySelector{ + Key: "setting01-%p-key", + LocalObjectReference: corev1.LocalObjectReference{ + Name: "setting01-secret-name", + }, + }, + }, + { + Name: "setting02", + Value: "setting02-val", + }, + }, +} + func TestValidator_ConnectorPlugin(t *testing.T) { tests := []struct { name string @@ -151,7 +177,7 @@ func TestValidator_ProcessorPlugin(t *testing.T) { cl := fake.NewClientBuilder().Build() v := NewValidator(ctx, cl, logger) - err := v.ValidateProcessorPlugin(c.Spec.Processors[0], fp) + err := v.validateProcessorPlugin(c.Spec.Processors[0], fp) if tc.wantErr != nil { is.True(err != nil) is.Equal(err.Error(), tc.wantErr.Error()) @@ -388,3 +414,177 @@ func TestValidator_ConnectorParameters(t *testing.T) { }) } } + +//nolint:bodyclose // Body is closed in the validator, bodyclose is not recognizing this. +func TestValidator_StandaloneProcessor(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + tests := []struct { + name string + setup func() (*v1alpha.ConduitProcessor, PluginRegistry) + wantErr error + }{ + { + name: "registration succeeds", + setup: func() (*v1alpha.ConduitProcessor, PluginRegistry) { + p := standaloneProc + + // create registry + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, p.Name, "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(1) + + // mock call to get wasm + webClient := SetupHTTPMockClient(t) + httpResps := GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + return &p, mockRegistry + }, + }, + { + name: "builtin processor", + setup: func() (*v1alpha.ConduitProcessor, PluginRegistry) { + p := v1alpha.ConduitProcessor{ + ID: "proc1", + Name: "proc1", + Plugin: "builtin:base64.encode", + Workers: 2, + Condition: "{{ eq .Metadata.key \"pipeline\" }}", + Settings: []v1alpha.SettingsVar{ + { + Name: "setting01", + SecretRef: &corev1.SecretKeySelector{ + Key: "setting01-%p-key", + LocalObjectReference: corev1.LocalObjectReference{ + Name: "setting01-secret-name", + }, + }, + }, + { + Name: "setting02", + Value: "setting02-val", + }, + }, + } + + // create registry + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, p.Name, "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, nil). + Times(0) + + // mock call to get wasm + webClient := SetupHTTPMockClient(t) + httpResps := GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + return &p, mockRegistry + }, + }, + { + name: "error during wasm check", + setup: func() (*v1alpha.ConduitProcessor, PluginRegistry) { + p := standaloneProc + mockRegistry := mock.NewMockPluginRegistry(ctrl) + + webClient := SetupHTTPMockClient(t) + httpResps := GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).Return(nil, errors.New("BOOM")), + ) + + return &p, mockRegistry + }, + wantErr: field.InternalError( + field.NewPath("spec", "processors", "standalone"), + fmt.Errorf("failed to save wasm to file: BOOM"), + ), + }, + { + name: "error during registration", + setup: func() (*v1alpha.ConduitProcessor, PluginRegistry) { + p := standaloneProc + + // create registry + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, p.Name, "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, errors.New("BOOM")). + Times(1) + + // mock call to fail + webClient := SetupHTTPMockClient(t) + httpResps := GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + return &p, mockRegistry + }, + wantErr: field.InternalError( + field.NewPath("spec", "processors", "standalone"), + fmt.Errorf("failed to register: BOOM"), + ), + }, + { + name: "plugin already registered", + setup: func() (*v1alpha.ConduitProcessor, PluginRegistry) { + p := standaloneProc + + // create registry + mockRegistry := mock.NewMockPluginRegistry(ctrl) + name := plugin.NewFullName(plugin.PluginTypeStandalone, p.Name, "latest") + mockRegistry.EXPECT(). + Register(gomock.Any(), gomock.Any()). + Return(name, plugin.ErrPluginAlreadyRegistered). + Times(1) + + // mock call to fail + webClient := SetupHTTPMockClient(t) + httpResps := GetHTTPResps(t) + gomock.InOrder( + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["list"]), + webClient.EXPECT().Do(gomock.Any()).DoAndReturn(httpResps["wasm"]), + ) + + return &p, mockRegistry + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(_ *testing.T) { + is := is.New(t) + logger := testr.New(t) + ctx := context.Background() + p, reg := tc.setup() + cl := fake.NewClientBuilder().Build() + v := NewValidator(ctx, cl, logger) + fp := field.NewPath("spec").Child("processors") + + err := v.validateStandaloneProcessor(ctx, p, reg, fp) + if tc.wantErr != nil { + assert.NotNil(t, err) + is.Equal(tc.wantErr.Error(), err.Error()) + } else { + is.True(err == nil) + } + }) + } +} diff --git a/pkg/conduit/http_client.go b/pkg/conduit/http_client.go new file mode 100644 index 0000000..9245101 --- /dev/null +++ b/pkg/conduit/http_client.go @@ -0,0 +1,7 @@ +package conduit + +import "net/http" + +type httpClient interface { + Do(req *http.Request) (*http.Response, error) +} diff --git a/pkg/conduit/mock/http_client_mock.go b/pkg/conduit/mock/http_client_mock.go index 6e6fbe8..90f896e 100644 --- a/pkg/conduit/mock/http_client_mock.go +++ b/pkg/conduit/mock/http_client_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./conduit_validator.go +// Source: ./http_client.go // Package mock is a generated GoMock package. package mock diff --git a/pkg/conduit/mock/registry_mock.go b/pkg/conduit/mock/registry_mock.go new file mode 100644 index 0000000..1c1f9d1 --- /dev/null +++ b/pkg/conduit/mock/registry_mock.go @@ -0,0 +1,51 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./plugin_registry.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + plugin "github.com/conduitio/conduit/pkg/plugin" + gomock "github.com/golang/mock/gomock" +) + +// MockPluginRegistry is a mock of PluginRegistry interface. +type MockPluginRegistry struct { + ctrl *gomock.Controller + recorder *MockPluginRegistryMockRecorder +} + +// MockPluginRegistryMockRecorder is the mock recorder for MockPluginRegistry. +type MockPluginRegistryMockRecorder struct { + mock *MockPluginRegistry +} + +// NewMockPluginRegistry creates a new mock instance. +func NewMockPluginRegistry(ctrl *gomock.Controller) *MockPluginRegistry { + mock := &MockPluginRegistry{ctrl: ctrl} + mock.recorder = &MockPluginRegistryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPluginRegistry) EXPECT() *MockPluginRegistryMockRecorder { + return m.recorder +} + +// Register mocks base method. +func (m *MockPluginRegistry) Register(ctx context.Context, path string) (plugin.FullName, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Register", ctx, path) + ret0, _ := ret[0].(plugin.FullName) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Register indicates an expected call of Register. +func (mr *MockPluginRegistryMockRecorder) Register(ctx, path interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Register", reflect.TypeOf((*MockPluginRegistry)(nil).Register), ctx, path) +} diff --git a/pkg/conduit/plugin_registry.go b/pkg/conduit/plugin_registry.go new file mode 100644 index 0000000..61a4bb3 --- /dev/null +++ b/pkg/conduit/plugin_registry.go @@ -0,0 +1,11 @@ +package conduit + +import ( + "context" + + "github.com/conduitio/conduit/pkg/plugin" +) + +type PluginRegistry interface { + Register(ctx context.Context, path string) (plugin.FullName, error) +} diff --git a/pkg/conduit/test_helpers.go b/pkg/conduit/test_helpers.go index 601b450..09d5529 100644 --- a/pkg/conduit/test_helpers.go +++ b/pkg/conduit/test_helpers.go @@ -17,6 +17,9 @@ var connectorYAMLResp string //go:embed testdata/connector-list.json var connectorListResp string +//go:embed testdata/proc.wasm +var procWASMResp string + func SetupHTTPMockClient(t *testing.T) *mock.MockhttpClient { t.Helper() ctrl := gomock.NewController(t) @@ -50,5 +53,14 @@ func GetHTTPResps(t *testing.T) map[string]func(*http.Request) (*http.Response, } respMap["spec"] = respFn + respFn = func(_ *http.Request) (*http.Response, error) { + resp := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(strings.NewReader(procWASMResp)), + } + return resp, nil + } + respMap["wasm"] = respFn + return respMap } diff --git a/pkg/conduit/testdata/proc.wasm b/pkg/conduit/testdata/proc.wasm new file mode 100644 index 0000000..ca018b3 Binary files /dev/null and b/pkg/conduit/testdata/proc.wasm differ diff --git a/pkg/conduit/validator.go b/pkg/conduit/validator.go index e616e07..c006809 100644 --- a/pkg/conduit/validator.go +++ b/pkg/conduit/validator.go @@ -9,5 +9,5 @@ import ( type ValidatorService interface { ValidateConnector(ctx context.Context, c *v1alpha.ConduitConnector, fp *field.Path) *field.Error - ValidateProcessorPlugin(p *v1alpha.ConduitProcessor, fp *field.Path) *field.Error + ValidateProcessor(ctx context.Context, p *v1alpha.ConduitProcessor, r PluginRegistry, fp *field.Path) *field.Error }