diff --git a/pkg/internal/api/clickpipe.go b/pkg/internal/api/clickpipe.go index 5043c984..841dd25d 100644 --- a/pkg/internal/api/clickpipe.go +++ b/pkg/internal/api/clickpipe.go @@ -393,3 +393,55 @@ func (c *ClientImpl) UpdateClickPipeSettings(ctx context.Context, serviceId stri return settingsResponse.Result, nil } + +type ClickPipeCdcScaling struct { + ReplicaCpuMillicores int64 `json:"replicaCpuMillicores"` + ReplicaMemoryGb float64 `json:"replicaMemoryGb"` +} + +type ClickPipeCdcScalingRequest struct { + ReplicaCpuMillicores int64 `json:"replicaCpuMillicores"` + ReplicaMemoryGb float64 `json:"replicaMemoryGb"` +} + +func (c *ClientImpl) GetClickPipeCdcScaling(ctx context.Context, serviceId string) (*ClickPipeCdcScaling, error) { + req, err := http.NewRequest(http.MethodGet, c.getServicePath(serviceId, "/clickpipesCdcScaling"), nil) + if err != nil { + return nil, err + } + body, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + scalingResponse := ResponseWithResult[ClickPipeCdcScaling]{} + if err := json.Unmarshal(body, &scalingResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal CDC scaling: %w", err) + } + + return &scalingResponse.Result, nil +} + +func (c *ClientImpl) UpdateClickPipeCdcScaling(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (*ClickPipeCdcScaling, error) { + var payload bytes.Buffer + if err := json.NewEncoder(&payload).Encode(request); err != nil { + return nil, fmt.Errorf("failed to encode CDC scaling: %w", err) + } + + req, err := http.NewRequest(http.MethodPatch, c.getServicePath(serviceId, "/clickpipesCdcScaling"), &payload) + if err != nil { + return nil, err + } + + body, err := c.doRequest(ctx, req) + if err != nil { + return nil, err + } + + scalingResponse := ResponseWithResult[ClickPipeCdcScaling]{} + if err := json.Unmarshal(body, &scalingResponse); err != nil { + return nil, fmt.Errorf("failed to unmarshal CDC scaling: %w", err) + } + + return &scalingResponse.Result, nil +} diff --git a/pkg/internal/api/clickpipe_models.go b/pkg/internal/api/clickpipe_models.go index 45afd201..df46312b 100644 --- a/pkg/internal/api/clickpipe_models.go +++ b/pkg/internal/api/clickpipe_models.go @@ -144,12 +144,12 @@ type ClickPipeKinesisSource struct { } type ClickPipePostgresSource struct { - Host string `json:"host"` - Port int `json:"port"` - Database string `json:"database"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + Database string `json:"database,omitempty"` Credentials *ClickPipeSourceCredentials `json:"credentials,omitempty"` - Settings ClickPipePostgresSettings `json:"settings"` - Mappings []ClickPipePostgresTableMapping `json:"tableMappings"` + Settings *ClickPipePostgresSettings `json:"settings,omitempty"` + Mappings []ClickPipePostgresTableMapping `json:"tableMappings,omitempty"` TableMappingsToRemove []ClickPipePostgresTableMapping `json:"tableMappingsToRemove,omitempty"` TableMappingsToAdd []ClickPipePostgresTableMapping `json:"tableMappingsToAdd,omitempty"` } @@ -158,7 +158,7 @@ type ClickPipePostgresSettings struct { SyncIntervalSeconds *int `json:"syncIntervalSeconds,omitempty"` PullBatchSize *int `json:"pullBatchSize,omitempty"` PublicationName *string `json:"publicationName,omitempty"` - ReplicationMode string `json:"replicationMode"` + ReplicationMode string `json:"replicationMode,omitempty"` ReplicationSlotName *string `json:"replicationSlotName,omitempty"` AllowNullableColumns *bool `json:"allowNullableColumns,omitempty"` InitialLoadParallelism *int `json:"initialLoadParallelism,omitempty"` diff --git a/pkg/internal/api/client_mock.go b/pkg/internal/api/client_mock.go index f33737dd..2b49aeae 100644 --- a/pkg/internal/api/client_mock.go +++ b/pkg/internal/api/client_mock.go @@ -1,4 +1,4 @@ -// Code generated by http://github.com/gojuno/minimock (v3.4.7). DO NOT EDIT. +// Code generated by http://github.com/gojuno/minimock (v3.4.5). DO NOT EDIT. package api @@ -103,6 +103,13 @@ type ClientMock struct { beforeGetClickPipeCounter uint64 GetClickPipeMock mClientMockGetClickPipe + funcGetClickPipeCdcScaling func(ctx context.Context, serviceId string) (cp1 *ClickPipeCdcScaling, err error) + funcGetClickPipeCdcScalingOrigin string + inspectFuncGetClickPipeCdcScaling func(ctx context.Context, serviceId string) + afterGetClickPipeCdcScalingCounter uint64 + beforeGetClickPipeCdcScalingCounter uint64 + GetClickPipeCdcScalingMock mClientMockGetClickPipeCdcScaling + funcGetClickPipeSettings func(ctx context.Context, serviceId string, clickPipeId string) (m1 map[string]any, err error) funcGetClickPipeSettingsOrigin string inspectFuncGetClickPipeSettings func(ctx context.Context, serviceId string, clickPipeId string) @@ -187,6 +194,13 @@ type ClientMock struct { beforeUpdateClickPipeCounter uint64 UpdateClickPipeMock mClientMockUpdateClickPipe + funcUpdateClickPipeCdcScaling func(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (cp1 *ClickPipeCdcScaling, err error) + funcUpdateClickPipeCdcScalingOrigin string + inspectFuncUpdateClickPipeCdcScaling func(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) + afterUpdateClickPipeCdcScalingCounter uint64 + beforeUpdateClickPipeCdcScalingCounter uint64 + UpdateClickPipeCdcScalingMock mClientMockUpdateClickPipeCdcScaling + funcUpdateClickPipeSettings func(ctx context.Context, serviceId string, clickPipeId string, settings map[string]any) (m1 map[string]any, err error) funcUpdateClickPipeSettingsOrigin string inspectFuncUpdateClickPipeSettings func(ctx context.Context, serviceId string, clickPipeId string, settings map[string]any) @@ -288,6 +302,9 @@ func NewClientMock(t minimock.Tester) *ClientMock { m.GetClickPipeMock = mClientMockGetClickPipe{mock: m} m.GetClickPipeMock.callArgs = []*ClientMockGetClickPipeParams{} + m.GetClickPipeCdcScalingMock = mClientMockGetClickPipeCdcScaling{mock: m} + m.GetClickPipeCdcScalingMock.callArgs = []*ClientMockGetClickPipeCdcScalingParams{} + m.GetClickPipeSettingsMock = mClientMockGetClickPipeSettings{mock: m} m.GetClickPipeSettingsMock.callArgs = []*ClientMockGetClickPipeSettingsParams{} @@ -324,6 +341,9 @@ func NewClientMock(t minimock.Tester) *ClientMock { m.UpdateClickPipeMock = mClientMockUpdateClickPipe{mock: m} m.UpdateClickPipeMock.callArgs = []*ClientMockUpdateClickPipeParams{} + m.UpdateClickPipeCdcScalingMock = mClientMockUpdateClickPipeCdcScaling{mock: m} + m.UpdateClickPipeCdcScalingMock.callArgs = []*ClientMockUpdateClickPipeCdcScalingParams{} + m.UpdateClickPipeSettingsMock = mClientMockUpdateClickPipeSettings{mock: m} m.UpdateClickPipeSettingsMock.callArgs = []*ClientMockUpdateClickPipeSettingsParams{} @@ -4715,6 +4735,349 @@ func (m *ClientMock) MinimockGetClickPipeInspect() { } } +type mClientMockGetClickPipeCdcScaling struct { + optional bool + mock *ClientMock + defaultExpectation *ClientMockGetClickPipeCdcScalingExpectation + expectations []*ClientMockGetClickPipeCdcScalingExpectation + + callArgs []*ClientMockGetClickPipeCdcScalingParams + mutex sync.RWMutex + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// ClientMockGetClickPipeCdcScalingExpectation specifies expectation struct of the Client.GetClickPipeCdcScaling +type ClientMockGetClickPipeCdcScalingExpectation struct { + mock *ClientMock + params *ClientMockGetClickPipeCdcScalingParams + paramPtrs *ClientMockGetClickPipeCdcScalingParamPtrs + expectationOrigins ClientMockGetClickPipeCdcScalingExpectationOrigins + results *ClientMockGetClickPipeCdcScalingResults + returnOrigin string + Counter uint64 +} + +// ClientMockGetClickPipeCdcScalingParams contains parameters of the Client.GetClickPipeCdcScaling +type ClientMockGetClickPipeCdcScalingParams struct { + ctx context.Context + serviceId string +} + +// ClientMockGetClickPipeCdcScalingParamPtrs contains pointers to parameters of the Client.GetClickPipeCdcScaling +type ClientMockGetClickPipeCdcScalingParamPtrs struct { + ctx *context.Context + serviceId *string +} + +// ClientMockGetClickPipeCdcScalingResults contains results of the Client.GetClickPipeCdcScaling +type ClientMockGetClickPipeCdcScalingResults struct { + cp1 *ClickPipeCdcScaling + err error +} + +// ClientMockGetClickPipeCdcScalingOrigins contains origins of expectations of the Client.GetClickPipeCdcScaling +type ClientMockGetClickPipeCdcScalingExpectationOrigins struct { + origin string + originCtx string + originServiceId string +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Optional() *mClientMockGetClickPipeCdcScaling { + mmGetClickPipeCdcScaling.optional = true + return mmGetClickPipeCdcScaling +} + +// Expect sets up expected params for Client.GetClickPipeCdcScaling +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Expect(ctx context.Context, serviceId string) *mClientMockGetClickPipeCdcScaling { + if mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Set") + } + + if mmGetClickPipeCdcScaling.defaultExpectation == nil { + mmGetClickPipeCdcScaling.defaultExpectation = &ClientMockGetClickPipeCdcScalingExpectation{} + } + + if mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by ExpectParams functions") + } + + mmGetClickPipeCdcScaling.defaultExpectation.params = &ClientMockGetClickPipeCdcScalingParams{ctx, serviceId} + mmGetClickPipeCdcScaling.defaultExpectation.expectationOrigins.origin = minimock.CallerInfo(1) + for _, e := range mmGetClickPipeCdcScaling.expectations { + if minimock.Equal(e.params, mmGetClickPipeCdcScaling.defaultExpectation.params) { + mmGetClickPipeCdcScaling.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmGetClickPipeCdcScaling.defaultExpectation.params) + } + } + + return mmGetClickPipeCdcScaling +} + +// ExpectCtxParam1 sets up expected param ctx for Client.GetClickPipeCdcScaling +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) ExpectCtxParam1(ctx context.Context) *mClientMockGetClickPipeCdcScaling { + if mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Set") + } + + if mmGetClickPipeCdcScaling.defaultExpectation == nil { + mmGetClickPipeCdcScaling.defaultExpectation = &ClientMockGetClickPipeCdcScalingExpectation{} + } + + if mmGetClickPipeCdcScaling.defaultExpectation.params != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Expect") + } + + if mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs == nil { + mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs = &ClientMockGetClickPipeCdcScalingParamPtrs{} + } + mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs.ctx = &ctx + mmGetClickPipeCdcScaling.defaultExpectation.expectationOrigins.originCtx = minimock.CallerInfo(1) + + return mmGetClickPipeCdcScaling +} + +// ExpectServiceIdParam2 sets up expected param serviceId for Client.GetClickPipeCdcScaling +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) ExpectServiceIdParam2(serviceId string) *mClientMockGetClickPipeCdcScaling { + if mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Set") + } + + if mmGetClickPipeCdcScaling.defaultExpectation == nil { + mmGetClickPipeCdcScaling.defaultExpectation = &ClientMockGetClickPipeCdcScalingExpectation{} + } + + if mmGetClickPipeCdcScaling.defaultExpectation.params != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Expect") + } + + if mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs == nil { + mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs = &ClientMockGetClickPipeCdcScalingParamPtrs{} + } + mmGetClickPipeCdcScaling.defaultExpectation.paramPtrs.serviceId = &serviceId + mmGetClickPipeCdcScaling.defaultExpectation.expectationOrigins.originServiceId = minimock.CallerInfo(1) + + return mmGetClickPipeCdcScaling +} + +// Inspect accepts an inspector function that has same arguments as the Client.GetClickPipeCdcScaling +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Inspect(f func(ctx context.Context, serviceId string)) *mClientMockGetClickPipeCdcScaling { + if mmGetClickPipeCdcScaling.mock.inspectFuncGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("Inspect function is already set for ClientMock.GetClickPipeCdcScaling") + } + + mmGetClickPipeCdcScaling.mock.inspectFuncGetClickPipeCdcScaling = f + + return mmGetClickPipeCdcScaling +} + +// Return sets up results that will be returned by Client.GetClickPipeCdcScaling +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Return(cp1 *ClickPipeCdcScaling, err error) *ClientMock { + if mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Set") + } + + if mmGetClickPipeCdcScaling.defaultExpectation == nil { + mmGetClickPipeCdcScaling.defaultExpectation = &ClientMockGetClickPipeCdcScalingExpectation{mock: mmGetClickPipeCdcScaling.mock} + } + mmGetClickPipeCdcScaling.defaultExpectation.results = &ClientMockGetClickPipeCdcScalingResults{cp1, err} + mmGetClickPipeCdcScaling.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmGetClickPipeCdcScaling.mock +} + +// Set uses given function f to mock the Client.GetClickPipeCdcScaling method +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Set(f func(ctx context.Context, serviceId string) (cp1 *ClickPipeCdcScaling, err error)) *ClientMock { + if mmGetClickPipeCdcScaling.defaultExpectation != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("Default expectation is already set for the Client.GetClickPipeCdcScaling method") + } + + if len(mmGetClickPipeCdcScaling.expectations) > 0 { + mmGetClickPipeCdcScaling.mock.t.Fatalf("Some expectations are already set for the Client.GetClickPipeCdcScaling method") + } + + mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling = f + mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScalingOrigin = minimock.CallerInfo(1) + return mmGetClickPipeCdcScaling.mock +} + +// When sets expectation for the Client.GetClickPipeCdcScaling which will trigger the result defined by the following +// Then helper +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) When(ctx context.Context, serviceId string) *ClientMockGetClickPipeCdcScalingExpectation { + if mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.mock.t.Fatalf("ClientMock.GetClickPipeCdcScaling mock is already set by Set") + } + + expectation := &ClientMockGetClickPipeCdcScalingExpectation{ + mock: mmGetClickPipeCdcScaling.mock, + params: &ClientMockGetClickPipeCdcScalingParams{ctx, serviceId}, + expectationOrigins: ClientMockGetClickPipeCdcScalingExpectationOrigins{origin: minimock.CallerInfo(1)}, + } + mmGetClickPipeCdcScaling.expectations = append(mmGetClickPipeCdcScaling.expectations, expectation) + return expectation +} + +// Then sets up Client.GetClickPipeCdcScaling return parameters for the expectation previously defined by the When method +func (e *ClientMockGetClickPipeCdcScalingExpectation) Then(cp1 *ClickPipeCdcScaling, err error) *ClientMock { + e.results = &ClientMockGetClickPipeCdcScalingResults{cp1, err} + return e.mock +} + +// Times sets number of times Client.GetClickPipeCdcScaling should be invoked +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Times(n uint64) *mClientMockGetClickPipeCdcScaling { + if n == 0 { + mmGetClickPipeCdcScaling.mock.t.Fatalf("Times of ClientMock.GetClickPipeCdcScaling mock can not be zero") + } + mm_atomic.StoreUint64(&mmGetClickPipeCdcScaling.expectedInvocations, n) + mmGetClickPipeCdcScaling.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmGetClickPipeCdcScaling +} + +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) invocationsDone() bool { + if len(mmGetClickPipeCdcScaling.expectations) == 0 && mmGetClickPipeCdcScaling.defaultExpectation == nil && mmGetClickPipeCdcScaling.mock.funcGetClickPipeCdcScaling == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmGetClickPipeCdcScaling.mock.afterGetClickPipeCdcScalingCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmGetClickPipeCdcScaling.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// GetClickPipeCdcScaling implements Client +func (mmGetClickPipeCdcScaling *ClientMock) GetClickPipeCdcScaling(ctx context.Context, serviceId string) (cp1 *ClickPipeCdcScaling, err error) { + mm_atomic.AddUint64(&mmGetClickPipeCdcScaling.beforeGetClickPipeCdcScalingCounter, 1) + defer mm_atomic.AddUint64(&mmGetClickPipeCdcScaling.afterGetClickPipeCdcScalingCounter, 1) + + mmGetClickPipeCdcScaling.t.Helper() + + if mmGetClickPipeCdcScaling.inspectFuncGetClickPipeCdcScaling != nil { + mmGetClickPipeCdcScaling.inspectFuncGetClickPipeCdcScaling(ctx, serviceId) + } + + mm_params := ClientMockGetClickPipeCdcScalingParams{ctx, serviceId} + + // Record call args + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.mutex.Lock() + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.callArgs = append(mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.callArgs, &mm_params) + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.mutex.Unlock() + + for _, e := range mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.expectations { + if minimock.Equal(*e.params, mm_params) { + mm_atomic.AddUint64(&e.Counter, 1) + return e.results.cp1, e.results.err + } + } + + if mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.Counter, 1) + mm_want := mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.params + mm_want_ptrs := mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.paramPtrs + + mm_got := ClientMockGetClickPipeCdcScalingParams{ctx, serviceId} + + if mm_want_ptrs != nil { + + if mm_want_ptrs.ctx != nil && !minimock.Equal(*mm_want_ptrs.ctx, mm_got.ctx) { + mmGetClickPipeCdcScaling.t.Errorf("ClientMock.GetClickPipeCdcScaling got unexpected parameter ctx, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.originCtx, *mm_want_ptrs.ctx, mm_got.ctx, minimock.Diff(*mm_want_ptrs.ctx, mm_got.ctx)) + } + + if mm_want_ptrs.serviceId != nil && !minimock.Equal(*mm_want_ptrs.serviceId, mm_got.serviceId) { + mmGetClickPipeCdcScaling.t.Errorf("ClientMock.GetClickPipeCdcScaling got unexpected parameter serviceId, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.originServiceId, *mm_want_ptrs.serviceId, mm_got.serviceId, minimock.Diff(*mm_want_ptrs.serviceId, mm_got.serviceId)) + } + + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { + mmGetClickPipeCdcScaling.t.Errorf("ClientMock.GetClickPipeCdcScaling got unexpected parameters, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.origin, *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) + } + + mm_results := mmGetClickPipeCdcScaling.GetClickPipeCdcScalingMock.defaultExpectation.results + if mm_results == nil { + mmGetClickPipeCdcScaling.t.Fatal("No results are set for the ClientMock.GetClickPipeCdcScaling") + } + return (*mm_results).cp1, (*mm_results).err + } + if mmGetClickPipeCdcScaling.funcGetClickPipeCdcScaling != nil { + return mmGetClickPipeCdcScaling.funcGetClickPipeCdcScaling(ctx, serviceId) + } + mmGetClickPipeCdcScaling.t.Fatalf("Unexpected call to ClientMock.GetClickPipeCdcScaling. %v %v", ctx, serviceId) + return +} + +// GetClickPipeCdcScalingAfterCounter returns a count of finished ClientMock.GetClickPipeCdcScaling invocations +func (mmGetClickPipeCdcScaling *ClientMock) GetClickPipeCdcScalingAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmGetClickPipeCdcScaling.afterGetClickPipeCdcScalingCounter) +} + +// GetClickPipeCdcScalingBeforeCounter returns a count of ClientMock.GetClickPipeCdcScaling invocations +func (mmGetClickPipeCdcScaling *ClientMock) GetClickPipeCdcScalingBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmGetClickPipeCdcScaling.beforeGetClickPipeCdcScalingCounter) +} + +// Calls returns a list of arguments used in each call to ClientMock.GetClickPipeCdcScaling. +// The list is in the same order as the calls were made (i.e. recent calls have a higher index) +func (mmGetClickPipeCdcScaling *mClientMockGetClickPipeCdcScaling) Calls() []*ClientMockGetClickPipeCdcScalingParams { + mmGetClickPipeCdcScaling.mutex.RLock() + + argCopy := make([]*ClientMockGetClickPipeCdcScalingParams, len(mmGetClickPipeCdcScaling.callArgs)) + copy(argCopy, mmGetClickPipeCdcScaling.callArgs) + + mmGetClickPipeCdcScaling.mutex.RUnlock() + + return argCopy +} + +// MinimockGetClickPipeCdcScalingDone returns true if the count of the GetClickPipeCdcScaling invocations corresponds +// the number of defined expectations +func (m *ClientMock) MinimockGetClickPipeCdcScalingDone() bool { + if m.GetClickPipeCdcScalingMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.GetClickPipeCdcScalingMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.GetClickPipeCdcScalingMock.invocationsDone() +} + +// MinimockGetClickPipeCdcScalingInspect logs each unmet expectation +func (m *ClientMock) MinimockGetClickPipeCdcScalingInspect() { + for _, e := range m.GetClickPipeCdcScalingMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Errorf("Expected call to ClientMock.GetClickPipeCdcScaling at\n%s with params: %#v", e.expectationOrigins.origin, *e.params) + } + } + + afterGetClickPipeCdcScalingCounter := mm_atomic.LoadUint64(&m.afterGetClickPipeCdcScalingCounter) + // if default expectation was set then invocations count should be greater than zero + if m.GetClickPipeCdcScalingMock.defaultExpectation != nil && afterGetClickPipeCdcScalingCounter < 1 { + if m.GetClickPipeCdcScalingMock.defaultExpectation.params == nil { + m.t.Errorf("Expected call to ClientMock.GetClickPipeCdcScaling at\n%s", m.GetClickPipeCdcScalingMock.defaultExpectation.returnOrigin) + } else { + m.t.Errorf("Expected call to ClientMock.GetClickPipeCdcScaling at\n%s with params: %#v", m.GetClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.origin, *m.GetClickPipeCdcScalingMock.defaultExpectation.params) + } + } + // if func was set then invocations count should be greater than zero + if m.funcGetClickPipeCdcScaling != nil && afterGetClickPipeCdcScalingCounter < 1 { + m.t.Errorf("Expected call to ClientMock.GetClickPipeCdcScaling at\n%s", m.funcGetClickPipeCdcScalingOrigin) + } + + if !m.GetClickPipeCdcScalingMock.invocationsDone() && afterGetClickPipeCdcScalingCounter > 0 { + m.t.Errorf("Expected %d calls to ClientMock.GetClickPipeCdcScaling at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.GetClickPipeCdcScalingMock.expectedInvocations), m.GetClickPipeCdcScalingMock.expectedInvocationsOrigin, afterGetClickPipeCdcScalingCounter) + } +} + type mClientMockGetClickPipeSettings struct { optional bool mock *ClientMock @@ -9077,6 +9440,380 @@ func (m *ClientMock) MinimockUpdateClickPipeInspect() { } } +type mClientMockUpdateClickPipeCdcScaling struct { + optional bool + mock *ClientMock + defaultExpectation *ClientMockUpdateClickPipeCdcScalingExpectation + expectations []*ClientMockUpdateClickPipeCdcScalingExpectation + + callArgs []*ClientMockUpdateClickPipeCdcScalingParams + mutex sync.RWMutex + + expectedInvocations uint64 + expectedInvocationsOrigin string +} + +// ClientMockUpdateClickPipeCdcScalingExpectation specifies expectation struct of the Client.UpdateClickPipeCdcScaling +type ClientMockUpdateClickPipeCdcScalingExpectation struct { + mock *ClientMock + params *ClientMockUpdateClickPipeCdcScalingParams + paramPtrs *ClientMockUpdateClickPipeCdcScalingParamPtrs + expectationOrigins ClientMockUpdateClickPipeCdcScalingExpectationOrigins + results *ClientMockUpdateClickPipeCdcScalingResults + returnOrigin string + Counter uint64 +} + +// ClientMockUpdateClickPipeCdcScalingParams contains parameters of the Client.UpdateClickPipeCdcScaling +type ClientMockUpdateClickPipeCdcScalingParams struct { + ctx context.Context + serviceId string + request ClickPipeCdcScalingRequest +} + +// ClientMockUpdateClickPipeCdcScalingParamPtrs contains pointers to parameters of the Client.UpdateClickPipeCdcScaling +type ClientMockUpdateClickPipeCdcScalingParamPtrs struct { + ctx *context.Context + serviceId *string + request *ClickPipeCdcScalingRequest +} + +// ClientMockUpdateClickPipeCdcScalingResults contains results of the Client.UpdateClickPipeCdcScaling +type ClientMockUpdateClickPipeCdcScalingResults struct { + cp1 *ClickPipeCdcScaling + err error +} + +// ClientMockUpdateClickPipeCdcScalingOrigins contains origins of expectations of the Client.UpdateClickPipeCdcScaling +type ClientMockUpdateClickPipeCdcScalingExpectationOrigins struct { + origin string + originCtx string + originServiceId string + originRequest string +} + +// Marks this method to be optional. The default behavior of any method with Return() is '1 or more', meaning +// the test will fail minimock's automatic final call check if the mocked method was not called at least once. +// Optional() makes method check to work in '0 or more' mode. +// It is NOT RECOMMENDED to use this option unless you really need it, as default behaviour helps to +// catch the problems when the expected method call is totally skipped during test run. +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Optional() *mClientMockUpdateClickPipeCdcScaling { + mmUpdateClickPipeCdcScaling.optional = true + return mmUpdateClickPipeCdcScaling +} + +// Expect sets up expected params for Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Expect(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) *mClientMockUpdateClickPipeCdcScaling { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation = &ClientMockUpdateClickPipeCdcScalingExpectation{} + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by ExpectParams functions") + } + + mmUpdateClickPipeCdcScaling.defaultExpectation.params = &ClientMockUpdateClickPipeCdcScalingParams{ctx, serviceId, request} + mmUpdateClickPipeCdcScaling.defaultExpectation.expectationOrigins.origin = minimock.CallerInfo(1) + for _, e := range mmUpdateClickPipeCdcScaling.expectations { + if minimock.Equal(e.params, mmUpdateClickPipeCdcScaling.defaultExpectation.params) { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("Expectation set by When has same params: %#v", *mmUpdateClickPipeCdcScaling.defaultExpectation.params) + } + } + + return mmUpdateClickPipeCdcScaling +} + +// ExpectCtxParam1 sets up expected param ctx for Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) ExpectCtxParam1(ctx context.Context) *mClientMockUpdateClickPipeCdcScaling { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation = &ClientMockUpdateClickPipeCdcScalingExpectation{} + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.params != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Expect") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs = &ClientMockUpdateClickPipeCdcScalingParamPtrs{} + } + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs.ctx = &ctx + mmUpdateClickPipeCdcScaling.defaultExpectation.expectationOrigins.originCtx = minimock.CallerInfo(1) + + return mmUpdateClickPipeCdcScaling +} + +// ExpectServiceIdParam2 sets up expected param serviceId for Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) ExpectServiceIdParam2(serviceId string) *mClientMockUpdateClickPipeCdcScaling { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation = &ClientMockUpdateClickPipeCdcScalingExpectation{} + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.params != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Expect") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs = &ClientMockUpdateClickPipeCdcScalingParamPtrs{} + } + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs.serviceId = &serviceId + mmUpdateClickPipeCdcScaling.defaultExpectation.expectationOrigins.originServiceId = minimock.CallerInfo(1) + + return mmUpdateClickPipeCdcScaling +} + +// ExpectRequestParam3 sets up expected param request for Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) ExpectRequestParam3(request ClickPipeCdcScalingRequest) *mClientMockUpdateClickPipeCdcScaling { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation = &ClientMockUpdateClickPipeCdcScalingExpectation{} + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.params != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Expect") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs = &ClientMockUpdateClickPipeCdcScalingParamPtrs{} + } + mmUpdateClickPipeCdcScaling.defaultExpectation.paramPtrs.request = &request + mmUpdateClickPipeCdcScaling.defaultExpectation.expectationOrigins.originRequest = minimock.CallerInfo(1) + + return mmUpdateClickPipeCdcScaling +} + +// Inspect accepts an inspector function that has same arguments as the Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Inspect(f func(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest)) *mClientMockUpdateClickPipeCdcScaling { + if mmUpdateClickPipeCdcScaling.mock.inspectFuncUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("Inspect function is already set for ClientMock.UpdateClickPipeCdcScaling") + } + + mmUpdateClickPipeCdcScaling.mock.inspectFuncUpdateClickPipeCdcScaling = f + + return mmUpdateClickPipeCdcScaling +} + +// Return sets up results that will be returned by Client.UpdateClickPipeCdcScaling +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Return(cp1 *ClickPipeCdcScaling, err error) *ClientMock { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + if mmUpdateClickPipeCdcScaling.defaultExpectation == nil { + mmUpdateClickPipeCdcScaling.defaultExpectation = &ClientMockUpdateClickPipeCdcScalingExpectation{mock: mmUpdateClickPipeCdcScaling.mock} + } + mmUpdateClickPipeCdcScaling.defaultExpectation.results = &ClientMockUpdateClickPipeCdcScalingResults{cp1, err} + mmUpdateClickPipeCdcScaling.defaultExpectation.returnOrigin = minimock.CallerInfo(1) + return mmUpdateClickPipeCdcScaling.mock +} + +// Set uses given function f to mock the Client.UpdateClickPipeCdcScaling method +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Set(f func(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (cp1 *ClickPipeCdcScaling, err error)) *ClientMock { + if mmUpdateClickPipeCdcScaling.defaultExpectation != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("Default expectation is already set for the Client.UpdateClickPipeCdcScaling method") + } + + if len(mmUpdateClickPipeCdcScaling.expectations) > 0 { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("Some expectations are already set for the Client.UpdateClickPipeCdcScaling method") + } + + mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling = f + mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScalingOrigin = minimock.CallerInfo(1) + return mmUpdateClickPipeCdcScaling.mock +} + +// When sets expectation for the Client.UpdateClickPipeCdcScaling which will trigger the result defined by the following +// Then helper +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) When(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) *ClientMockUpdateClickPipeCdcScalingExpectation { + if mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("ClientMock.UpdateClickPipeCdcScaling mock is already set by Set") + } + + expectation := &ClientMockUpdateClickPipeCdcScalingExpectation{ + mock: mmUpdateClickPipeCdcScaling.mock, + params: &ClientMockUpdateClickPipeCdcScalingParams{ctx, serviceId, request}, + expectationOrigins: ClientMockUpdateClickPipeCdcScalingExpectationOrigins{origin: minimock.CallerInfo(1)}, + } + mmUpdateClickPipeCdcScaling.expectations = append(mmUpdateClickPipeCdcScaling.expectations, expectation) + return expectation +} + +// Then sets up Client.UpdateClickPipeCdcScaling return parameters for the expectation previously defined by the When method +func (e *ClientMockUpdateClickPipeCdcScalingExpectation) Then(cp1 *ClickPipeCdcScaling, err error) *ClientMock { + e.results = &ClientMockUpdateClickPipeCdcScalingResults{cp1, err} + return e.mock +} + +// Times sets number of times Client.UpdateClickPipeCdcScaling should be invoked +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Times(n uint64) *mClientMockUpdateClickPipeCdcScaling { + if n == 0 { + mmUpdateClickPipeCdcScaling.mock.t.Fatalf("Times of ClientMock.UpdateClickPipeCdcScaling mock can not be zero") + } + mm_atomic.StoreUint64(&mmUpdateClickPipeCdcScaling.expectedInvocations, n) + mmUpdateClickPipeCdcScaling.expectedInvocationsOrigin = minimock.CallerInfo(1) + return mmUpdateClickPipeCdcScaling +} + +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) invocationsDone() bool { + if len(mmUpdateClickPipeCdcScaling.expectations) == 0 && mmUpdateClickPipeCdcScaling.defaultExpectation == nil && mmUpdateClickPipeCdcScaling.mock.funcUpdateClickPipeCdcScaling == nil { + return true + } + + totalInvocations := mm_atomic.LoadUint64(&mmUpdateClickPipeCdcScaling.mock.afterUpdateClickPipeCdcScalingCounter) + expectedInvocations := mm_atomic.LoadUint64(&mmUpdateClickPipeCdcScaling.expectedInvocations) + + return totalInvocations > 0 && (expectedInvocations == 0 || expectedInvocations == totalInvocations) +} + +// UpdateClickPipeCdcScaling implements Client +func (mmUpdateClickPipeCdcScaling *ClientMock) UpdateClickPipeCdcScaling(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (cp1 *ClickPipeCdcScaling, err error) { + mm_atomic.AddUint64(&mmUpdateClickPipeCdcScaling.beforeUpdateClickPipeCdcScalingCounter, 1) + defer mm_atomic.AddUint64(&mmUpdateClickPipeCdcScaling.afterUpdateClickPipeCdcScalingCounter, 1) + + mmUpdateClickPipeCdcScaling.t.Helper() + + if mmUpdateClickPipeCdcScaling.inspectFuncUpdateClickPipeCdcScaling != nil { + mmUpdateClickPipeCdcScaling.inspectFuncUpdateClickPipeCdcScaling(ctx, serviceId, request) + } + + mm_params := ClientMockUpdateClickPipeCdcScalingParams{ctx, serviceId, request} + + // Record call args + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.mutex.Lock() + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.callArgs = append(mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.callArgs, &mm_params) + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.mutex.Unlock() + + for _, e := range mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.expectations { + if minimock.Equal(*e.params, mm_params) { + mm_atomic.AddUint64(&e.Counter, 1) + return e.results.cp1, e.results.err + } + } + + if mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation != nil { + mm_atomic.AddUint64(&mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.Counter, 1) + mm_want := mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.params + mm_want_ptrs := mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.paramPtrs + + mm_got := ClientMockUpdateClickPipeCdcScalingParams{ctx, serviceId, request} + + if mm_want_ptrs != nil { + + if mm_want_ptrs.ctx != nil && !minimock.Equal(*mm_want_ptrs.ctx, mm_got.ctx) { + mmUpdateClickPipeCdcScaling.t.Errorf("ClientMock.UpdateClickPipeCdcScaling got unexpected parameter ctx, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.originCtx, *mm_want_ptrs.ctx, mm_got.ctx, minimock.Diff(*mm_want_ptrs.ctx, mm_got.ctx)) + } + + if mm_want_ptrs.serviceId != nil && !minimock.Equal(*mm_want_ptrs.serviceId, mm_got.serviceId) { + mmUpdateClickPipeCdcScaling.t.Errorf("ClientMock.UpdateClickPipeCdcScaling got unexpected parameter serviceId, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.originServiceId, *mm_want_ptrs.serviceId, mm_got.serviceId, minimock.Diff(*mm_want_ptrs.serviceId, mm_got.serviceId)) + } + + if mm_want_ptrs.request != nil && !minimock.Equal(*mm_want_ptrs.request, mm_got.request) { + mmUpdateClickPipeCdcScaling.t.Errorf("ClientMock.UpdateClickPipeCdcScaling got unexpected parameter request, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.originRequest, *mm_want_ptrs.request, mm_got.request, minimock.Diff(*mm_want_ptrs.request, mm_got.request)) + } + + } else if mm_want != nil && !minimock.Equal(*mm_want, mm_got) { + mmUpdateClickPipeCdcScaling.t.Errorf("ClientMock.UpdateClickPipeCdcScaling got unexpected parameters, expected at\n%s:\nwant: %#v\n got: %#v%s\n", + mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.origin, *mm_want, mm_got, minimock.Diff(*mm_want, mm_got)) + } + + mm_results := mmUpdateClickPipeCdcScaling.UpdateClickPipeCdcScalingMock.defaultExpectation.results + if mm_results == nil { + mmUpdateClickPipeCdcScaling.t.Fatal("No results are set for the ClientMock.UpdateClickPipeCdcScaling") + } + return (*mm_results).cp1, (*mm_results).err + } + if mmUpdateClickPipeCdcScaling.funcUpdateClickPipeCdcScaling != nil { + return mmUpdateClickPipeCdcScaling.funcUpdateClickPipeCdcScaling(ctx, serviceId, request) + } + mmUpdateClickPipeCdcScaling.t.Fatalf("Unexpected call to ClientMock.UpdateClickPipeCdcScaling. %v %v %v", ctx, serviceId, request) + return +} + +// UpdateClickPipeCdcScalingAfterCounter returns a count of finished ClientMock.UpdateClickPipeCdcScaling invocations +func (mmUpdateClickPipeCdcScaling *ClientMock) UpdateClickPipeCdcScalingAfterCounter() uint64 { + return mm_atomic.LoadUint64(&mmUpdateClickPipeCdcScaling.afterUpdateClickPipeCdcScalingCounter) +} + +// UpdateClickPipeCdcScalingBeforeCounter returns a count of ClientMock.UpdateClickPipeCdcScaling invocations +func (mmUpdateClickPipeCdcScaling *ClientMock) UpdateClickPipeCdcScalingBeforeCounter() uint64 { + return mm_atomic.LoadUint64(&mmUpdateClickPipeCdcScaling.beforeUpdateClickPipeCdcScalingCounter) +} + +// Calls returns a list of arguments used in each call to ClientMock.UpdateClickPipeCdcScaling. +// The list is in the same order as the calls were made (i.e. recent calls have a higher index) +func (mmUpdateClickPipeCdcScaling *mClientMockUpdateClickPipeCdcScaling) Calls() []*ClientMockUpdateClickPipeCdcScalingParams { + mmUpdateClickPipeCdcScaling.mutex.RLock() + + argCopy := make([]*ClientMockUpdateClickPipeCdcScalingParams, len(mmUpdateClickPipeCdcScaling.callArgs)) + copy(argCopy, mmUpdateClickPipeCdcScaling.callArgs) + + mmUpdateClickPipeCdcScaling.mutex.RUnlock() + + return argCopy +} + +// MinimockUpdateClickPipeCdcScalingDone returns true if the count of the UpdateClickPipeCdcScaling invocations corresponds +// the number of defined expectations +func (m *ClientMock) MinimockUpdateClickPipeCdcScalingDone() bool { + if m.UpdateClickPipeCdcScalingMock.optional { + // Optional methods provide '0 or more' call count restriction. + return true + } + + for _, e := range m.UpdateClickPipeCdcScalingMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + return false + } + } + + return m.UpdateClickPipeCdcScalingMock.invocationsDone() +} + +// MinimockUpdateClickPipeCdcScalingInspect logs each unmet expectation +func (m *ClientMock) MinimockUpdateClickPipeCdcScalingInspect() { + for _, e := range m.UpdateClickPipeCdcScalingMock.expectations { + if mm_atomic.LoadUint64(&e.Counter) < 1 { + m.t.Errorf("Expected call to ClientMock.UpdateClickPipeCdcScaling at\n%s with params: %#v", e.expectationOrigins.origin, *e.params) + } + } + + afterUpdateClickPipeCdcScalingCounter := mm_atomic.LoadUint64(&m.afterUpdateClickPipeCdcScalingCounter) + // if default expectation was set then invocations count should be greater than zero + if m.UpdateClickPipeCdcScalingMock.defaultExpectation != nil && afterUpdateClickPipeCdcScalingCounter < 1 { + if m.UpdateClickPipeCdcScalingMock.defaultExpectation.params == nil { + m.t.Errorf("Expected call to ClientMock.UpdateClickPipeCdcScaling at\n%s", m.UpdateClickPipeCdcScalingMock.defaultExpectation.returnOrigin) + } else { + m.t.Errorf("Expected call to ClientMock.UpdateClickPipeCdcScaling at\n%s with params: %#v", m.UpdateClickPipeCdcScalingMock.defaultExpectation.expectationOrigins.origin, *m.UpdateClickPipeCdcScalingMock.defaultExpectation.params) + } + } + // if func was set then invocations count should be greater than zero + if m.funcUpdateClickPipeCdcScaling != nil && afterUpdateClickPipeCdcScalingCounter < 1 { + m.t.Errorf("Expected call to ClientMock.UpdateClickPipeCdcScaling at\n%s", m.funcUpdateClickPipeCdcScalingOrigin) + } + + if !m.UpdateClickPipeCdcScalingMock.invocationsDone() && afterUpdateClickPipeCdcScalingCounter > 0 { + m.t.Errorf("Expected %d calls to ClientMock.UpdateClickPipeCdcScaling at\n%s but found %d calls", + mm_atomic.LoadUint64(&m.UpdateClickPipeCdcScalingMock.expectedInvocations), m.UpdateClickPipeCdcScalingMock.expectedInvocationsOrigin, afterUpdateClickPipeCdcScalingCounter) + } +} + type mClientMockUpdateClickPipeSettings struct { optional bool mock *ClientMock @@ -12251,6 +12988,8 @@ func (m *ClientMock) MinimockFinish() { m.MinimockGetClickPipeInspect() + m.MinimockGetClickPipeCdcScalingInspect() + m.MinimockGetClickPipeSettingsInspect() m.MinimockGetOrgPrivateEndpointConfigInspect() @@ -12275,6 +13014,8 @@ func (m *ClientMock) MinimockFinish() { m.MinimockUpdateClickPipeInspect() + m.MinimockUpdateClickPipeCdcScalingInspect() + m.MinimockUpdateClickPipeSettingsInspect() m.MinimockUpdateOrganizationPrivateEndpointsInspect() @@ -12325,6 +13066,7 @@ func (m *ClientMock) minimockDone() bool { m.MinimockGetApiKeyIDDone() && m.MinimockGetBackupConfigurationDone() && m.MinimockGetClickPipeDone() && + m.MinimockGetClickPipeCdcScalingDone() && m.MinimockGetClickPipeSettingsDone() && m.MinimockGetOrgPrivateEndpointConfigDone() && m.MinimockGetOrganizationPrivateEndpointsDone() && @@ -12337,6 +13079,7 @@ func (m *ClientMock) minimockDone() bool { m.MinimockScalingClickPipeDone() && m.MinimockUpdateBackupConfigurationDone() && m.MinimockUpdateClickPipeDone() && + m.MinimockUpdateClickPipeCdcScalingDone() && m.MinimockUpdateClickPipeSettingsDone() && m.MinimockUpdateOrganizationPrivateEndpointsDone() && m.MinimockUpdateReplicaScalingDone() && diff --git a/pkg/internal/api/interface.go b/pkg/internal/api/interface.go index 6955d5d5..5298d311 100644 --- a/pkg/internal/api/interface.go +++ b/pkg/internal/api/interface.go @@ -35,6 +35,8 @@ type Client interface { DeleteClickPipe(ctx context.Context, serviceId string, clickPipeId string) error GetClickPipeSettings(ctx context.Context, serviceId string, clickPipeId string) (map[string]any, error) UpdateClickPipeSettings(ctx context.Context, serviceId string, clickPipeId string, settings map[string]any) (map[string]any, error) + GetClickPipeCdcScaling(ctx context.Context, serviceId string) (*ClickPipeCdcScaling, error) + UpdateClickPipeCdcScaling(ctx context.Context, serviceId string, request ClickPipeCdcScalingRequest) (*ClickPipeCdcScaling, error) GetReversePrivateEndpointPath(serviceId, reversePrivateEndpointId string) string ListReversePrivateEndpoints(ctx context.Context, serviceId string) ([]*ReversePrivateEndpoint, error) diff --git a/pkg/resource/clickpipe.go b/pkg/resource/clickpipe.go index 5dd5b0fd..f9dbfc42 100644 --- a/pkg/resource/clickpipe.go +++ b/pkg/resource/clickpipe.go @@ -179,6 +179,9 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, "kafka": schema.SingleNestedAttribute{ MarkdownDescription: "The Kafka source configuration for the ClickPipe.", Optional: true, + PlanModifiers: []planmodifier.Object{ + requiresReplaceIfSourceTypeChanges{}, + }, Attributes: map[string]schema.Attribute{ "type": schema.StringAttribute{ MarkdownDescription: fmt.Sprintf( @@ -359,6 +362,9 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, "object_storage": schema.SingleNestedAttribute{ MarkdownDescription: "The compatible object storage source configuration for the ClickPipe.", Optional: true, + PlanModifiers: []planmodifier.Object{ + requiresReplaceIfSourceTypeChanges{}, + }, Attributes: map[string]schema.Attribute{ "type": schema.StringAttribute{ MarkdownDescription: fmt.Sprintf( @@ -485,13 +491,13 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, }, }, }, - PlanModifiers: []planmodifier.Object{ - objectplanmodifier.RequiresReplace(), - }, }, "kinesis": schema.SingleNestedAttribute{ MarkdownDescription: "The Kinesis source configuration for the ClickPipe.", Optional: true, + PlanModifiers: []planmodifier.Object{ + requiresReplaceIfSourceTypeChanges{}, + }, Attributes: map[string]schema.Attribute{ "format": schema.StringAttribute{ MarkdownDescription: fmt.Sprintf( @@ -596,6 +602,9 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, "postgres": schema.SingleNestedAttribute{ MarkdownDescription: "The Postgres CDC source configuration for the ClickPipe.", Optional: true, + PlanModifiers: []planmodifier.Object{ + requiresReplaceIfSourceTypeChanges{}, + }, Attributes: map[string]schema.Attribute{ "host": schema.StringAttribute{ Description: "The hostname of the Postgres instance.", @@ -723,12 +732,9 @@ func (c *ClickPipeResource) Schema(_ context.Context, _ resource.SchemaRequest, }, }, }, - "table_mappings": schema.ListNestedAttribute{ + "table_mappings": schema.SetNestedAttribute{ Description: "Table mappings from Postgres source to ClickHouse destination.", Required: true, - PlanModifiers: []planmodifier.List{ - listplanmodifier.RequiresReplace(), - }, NestedObject: schema.NestedAttributeObject{ Attributes: map[string]schema.Attribute{ "source_schema_name": schema.StringAttribute{ @@ -1094,6 +1100,83 @@ func (c *ClickPipeResource) ModifyPlan(ctx context.Context, request resource.Mod plan.TriggerResync = types.BoolValue(false) response.Diagnostics.Append(response.Plan.Set(ctx, plan)...) } + + // Validate Postgres table mappings changes + if !request.State.Raw.IsNull() { + var planSourceModel, stateSourceModel models.ClickPipeSourceModel + if diags := plan.Source.As(ctx, &planSourceModel, basetypes.ObjectAsOptions{}); !diags.HasError() { + if diags := state.Source.As(ctx, &stateSourceModel, basetypes.ObjectAsOptions{}); !diags.HasError() { + // Only validate if this is a Postgres source + if !planSourceModel.Postgres.IsNull() && !stateSourceModel.Postgres.IsNull() { + var planPostgres, statePostgres models.ClickPipePostgresSourceModel + if diags := planSourceModel.Postgres.As(ctx, &planPostgres, basetypes.ObjectAsOptions{}); !diags.HasError() { + if diags := stateSourceModel.Postgres.As(ctx, &statePostgres, basetypes.ObjectAsOptions{}); !diags.HasError() { + + // Get state and plan table mappings + var stateMappings, planMappings []models.ClickPipePostgresTableMappingModel + if !statePostgres.TableMappings.IsNull() { + stateMappings = make([]models.ClickPipePostgresTableMappingModel, len(statePostgres.TableMappings.Elements())) + statePostgres.TableMappings.ElementsAs(ctx, &stateMappings, false) + } + if !planPostgres.TableMappings.IsNull() { + planMappings = make([]models.ClickPipePostgresTableMappingModel, len(planPostgres.TableMappings.Elements())) + planPostgres.TableMappings.ElementsAs(ctx, &planMappings, false) + } + + // Validation 1: Cannot delete the last table mapping + if len(stateMappings) == 1 && len(planMappings) == 0 { + response.Diagnostics.AddError( + "Invalid table_mappings configuration", + "Cannot delete the last table mapping. Postgres CDC pipes require at least one table mapping.", + ) + } + + // Validation 2: Cannot delete and add a mapping for the same source table + if len(stateMappings) > 0 && len(planMappings) > 0 { + // Build sets of source tables (schema.table as key) + stateTableKeys := make(map[string]bool) + planTableKeys := make(map[string]bool) + + for _, mapping := range stateMappings { + key := fmt.Sprintf("%s.%s", mapping.SourceSchemaName.ValueString(), mapping.SourceTable.ValueString()) + stateTableKeys[key] = true + } + for _, mapping := range planMappings { + key := fmt.Sprintf("%s.%s", mapping.SourceSchemaName.ValueString(), mapping.SourceTable.ValueString()) + planTableKeys[key] = true + } + + // Find source tables being removed and added + removedTables := make(map[string]bool) + addedTables := make(map[string]bool) + + for key := range stateTableKeys { + if !planTableKeys[key] { + removedTables[key] = true + } + } + for key := range planTableKeys { + if !stateTableKeys[key] { + addedTables[key] = true + } + } + + // Check for intersection: if same source table is both removed and added, deny + for removed := range removedTables { + if addedTables[removed] { + response.Diagnostics.AddError( + "Invalid table_mappings configuration", + fmt.Sprintf("Cannot delete and add a table mapping for the same source table '%s'. To modify a table mapping, update it in place rather than deleting and recreating it.", removed), + ) + } + } + } + } + } + } + } + } + } } func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateRequest, response *resource.CreateResponse) { @@ -1313,11 +1396,15 @@ func (c *ClickPipeResource) Create(ctx context.Context, request resource.CreateR // Determine expected state(s) based on configuration stateCheckFunc := c.getStateCheckFunc(ctx, plan) - if _, err := c.client.WaitForClickPipeState(ctx, serviceID, createdClickPipe.ID, stateCheckFunc, clickPipeStateChangeMaxWaitSeconds); err != nil { - response.Diagnostics.AddWarning( - "ClickPipe didn't reach the desired state", - err.Error(), - ) + finalClickPipe, err := c.client.WaitForClickPipeState(ctx, serviceID, createdClickPipe.ID, stateCheckFunc, clickPipeStateChangeMaxWaitSeconds) + if err != nil { + // Only warn if the final state is not acceptable + if finalClickPipe == nil || !stateCheckFunc(finalClickPipe.State) { + response.Diagnostics.AddWarning( + "ClickPipe didn't reach the desired state", + err.Error(), + ) + } } plan.ID = types.StringValue(createdClickPipe.ID) @@ -1572,7 +1659,7 @@ func (c *ClickPipeResource) extractSourceFromPlan(ctx context.Context, diagnosti settingsModel := models.ClickPipePostgresSettingsModel{} diagnostics.Append(postgresModel.Settings.As(ctx, &settingsModel, basetypes.ObjectAsOptions{})...) - settings := api.ClickPipePostgresSettings{ + settings := &api.ClickPipePostgresSettings{ ReplicationMode: settingsModel.ReplicationMode.ValueString(), } @@ -1711,16 +1798,18 @@ func (c *ClickPipeResource) getStateCheckFunc(ctx context.Context, plan models.C } } - // Check if this is a snapshot-only Postgres pipe + // Check if this is a Postgres pipe and what replication mode + isPostgresPipe := false isSnapshotOnly := false var sourceModel models.ClickPipeSourceModel - if diags := plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{}); diags == nil { + if diags := plan.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{}); !diags.HasError() { if !sourceModel.Postgres.IsNull() { + isPostgresPipe = true var postgresSource models.ClickPipePostgresSourceModel - if diags := sourceModel.Postgres.As(ctx, &postgresSource, basetypes.ObjectAsOptions{}); diags == nil { + if diags := sourceModel.Postgres.As(ctx, &postgresSource, basetypes.ObjectAsOptions{}); !diags.HasError() { if !postgresSource.Settings.IsNull() { var settings models.ClickPipePostgresSettingsModel - if diags := postgresSource.Settings.As(ctx, &settings, basetypes.ObjectAsOptions{}); diags == nil { + if diags := postgresSource.Settings.As(ctx, &settings, basetypes.ObjectAsOptions{}); !diags.HasError() { isSnapshotOnly = settings.ReplicationMode.ValueString() == api.ClickPipePostgresReplicationModeSnapshot } } @@ -1728,8 +1817,8 @@ func (c *ClickPipeResource) getStateCheckFunc(ctx context.Context, plan models.C } } - // For snapshot-only pipes, accept both Running and Completed states - // Also treat terminal error states (Failed, InternalError) as complete to stop waiting + // For snapshot-only Postgres pipes, accept Completed, Snapshot, or Failed states + // Failed is included to stop waiting early (warning will be shown but apply continues) if isSnapshotOnly { return func(state string) bool { return state == api.ClickPipeCompletedState || @@ -1738,7 +1827,17 @@ func (c *ClickPipeResource) getStateCheckFunc(ctx context.Context, plan models.C } } - // For other pipes, wait for Running state or terminal error states + // For Postgres CDC pipes, accept Running, Snapshot (initial snapshot phase), or Failed states + // Snapshot state is normal during the initial snapshot before CDC starts + if isPostgresPipe { + return func(state string) bool { + return state == api.ClickPipeRunningState || + state == api.ClickPipeSnapShotState || + state == api.ClickPipeFailedState + } + } + + // For other pipes (Kafka, S3, etc.), wait for Running or Failed state return func(state string) bool { return state == api.ClickPipeRunningState || state == api.ClickPipeFailedState @@ -2047,27 +2146,32 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model settingsModel.EnableFailoverSlots = types.BoolNull() } - // Table mappings - preserve null values from state - var stateTableMappings []models.ClickPipePostgresTableMappingModel + // Table mappings - convert API response to Set (order doesn't matter) + // Get state mappings for preserving null values on optional fields + var stateTableMappingsMap map[string]models.ClickPipePostgresTableMappingModel if !statePostgresModel.TableMappings.IsNull() && len(statePostgresModel.TableMappings.Elements()) > 0 { - stateTableMappings = make([]models.ClickPipePostgresTableMappingModel, len(statePostgresModel.TableMappings.Elements())) + stateTableMappings := make([]models.ClickPipePostgresTableMappingModel, len(statePostgresModel.TableMappings.Elements())) statePostgresModel.TableMappings.ElementsAs(ctx, &stateTableMappings, false) + + stateTableMappingsMap = make(map[string]models.ClickPipePostgresTableMappingModel) + for _, stateMapping := range stateTableMappings { + key := fmt.Sprintf("%s.%s->%s", stateMapping.SourceSchemaName.ValueString(), stateMapping.SourceTable.ValueString(), stateMapping.TargetTable.ValueString()) + stateTableMappingsMap[key] = stateMapping + } } - tableMappingList := make([]attr.Value, len(clickPipe.Source.Postgres.Mappings)) - for i, mapping := range clickPipe.Source.Postgres.Mappings { + // Convert all API mappings to Set elements (order doesn't matter for Sets) + tableMappingList := make([]attr.Value, 0, len(clickPipe.Source.Postgres.Mappings)) + for _, mapping := range clickPipe.Source.Postgres.Mappings { + key := fmt.Sprintf("%s.%s->%s", mapping.SourceSchemaName, mapping.SourceTable, mapping.TargetTable) + stateMapping, hasStateMapping := stateTableMappingsMap[key] + tableMappingModel := models.ClickPipePostgresTableMappingModel{ SourceSchemaName: types.StringValue(mapping.SourceSchemaName), SourceTable: types.StringValue(mapping.SourceTable), TargetTable: types.StringValue(mapping.TargetTable), } - // Get corresponding state mapping if it exists - var stateMapping *models.ClickPipePostgresTableMappingModel - if i < len(stateTableMappings) { - stateMapping = &stateTableMappings[i] - } - if len(mapping.ExcludedColumns) > 0 { excludedColsList := make([]attr.Value, len(mapping.ExcludedColumns)) for j, col := range mapping.ExcludedColumns { @@ -2095,17 +2199,17 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model } // For table_engine, preserve null from state if it was null (API may return default) - if stateMapping != nil && stateMapping.TableEngine.IsNull() { + if hasStateMapping && stateMapping.TableEngine.IsNull() { tableMappingModel.TableEngine = types.StringNull() } else if mapping.TableEngine != nil && *mapping.TableEngine != "" { tableMappingModel.TableEngine = types.StringValue(*mapping.TableEngine) - } else if stateMapping != nil { + } else if hasStateMapping { tableMappingModel.TableEngine = stateMapping.TableEngine } else { tableMappingModel.TableEngine = types.StringNull() } - tableMappingList[i] = tableMappingModel.ObjectValue() + tableMappingList = append(tableMappingList, tableMappingModel.ObjectValue()) } postgresModel := models.ClickPipePostgresSourceModel{ @@ -2113,11 +2217,11 @@ func (c *ClickPipeResource) syncClickPipeState(ctx context.Context, state *model Port: types.Int64Value(int64(clickPipe.Source.Postgres.Port)), Database: types.StringValue(clickPipe.Source.Postgres.Database), Settings: settingsModel.ObjectValue(), - TableMappings: types.ListNull(models.ClickPipePostgresTableMappingModel{}.ObjectType()), + TableMappings: types.SetNull(models.ClickPipePostgresTableMappingModel{}.ObjectType()), } if len(tableMappingList) > 0 { - postgresModel.TableMappings, _ = types.ListValue(models.ClickPipePostgresTableMappingModel{}.ObjectType(), tableMappingList) + postgresModel.TableMappings, _ = types.SetValue(models.ClickPipePostgresTableMappingModel{}.ObjectType(), tableMappingList) } // Preserve credentials from state as API doesn't return them @@ -2610,11 +2714,15 @@ func (c *ClickPipeResource) Update(ctx context.Context, req resource.UpdateReque // Determine expected state(s) based on configuration stateCheckFunc := c.getStateCheckFunc(ctx, plan) - if _, err := c.client.WaitForClickPipeState(ctx, state.ServiceID.ValueString(), state.ID.ValueString(), stateCheckFunc, clickPipeStateChangeMaxWaitSeconds); err != nil { - response.Diagnostics.AddWarning( - "ClickPipe didn't reach the desired state", - err.Error(), - ) + finalClickPipe, err := c.client.WaitForClickPipeState(ctx, state.ServiceID.ValueString(), state.ID.ValueString(), stateCheckFunc, clickPipeStateChangeMaxWaitSeconds) + if err != nil { + // Only warn if the final state is not acceptable + if finalClickPipe == nil || !stateCheckFunc(finalClickPipe.State) { + response.Diagnostics.AddWarning( + "ClickPipe didn't reach the desired state", + err.Error(), + ) + } } if err := c.syncClickPipeState(ctx, &plan); err != nil { @@ -2637,6 +2745,17 @@ func (c *ClickPipeResource) Delete(ctx context.Context, request resource.DeleteR return } + // Check if this is a Postgres pipe - warn about manual table cleanup + var sourceModel models.ClickPipeSourceModel + if diags := state.Source.As(ctx, &sourceModel, basetypes.ObjectAsOptions{}); !diags.HasError() { + if !sourceModel.Postgres.IsNull() { + response.Diagnostics.AddWarning( + "Manual table cleanup required", + "Previous destination tables need to be deleted manually before a recreation of the CDC pipe can occur.", + ) + } + } + if err := c.client.DeleteClickPipe(ctx, state.ServiceID.ValueString(), state.ID.ValueString()); err != nil { response.Diagnostics.AddError( "Error Deleting ClickPipe", @@ -2673,3 +2792,36 @@ func (r *ClickPipeResource) ImportState(ctx context.Context, req resource.Import "Important: your configuration (in *.tf files) has to provide valid credentials.", ) } + +// requiresReplaceIfSourceTypeChanges is a custom plan modifier that requires replacement +// only when the source type changes (null → non-null or non-null → null), but allows +// updates to fields within the same source type. +type requiresReplaceIfSourceTypeChanges struct{} + +func (r requiresReplaceIfSourceTypeChanges) Description(ctx context.Context) string { + return "Requires replacement if the source type changes (e.g., switching from Kafka to Postgres)." +} + +func (r requiresReplaceIfSourceTypeChanges) MarkdownDescription(ctx context.Context) string { + return "Requires replacement if the source type changes (e.g., switching from Kafka to Postgres)." +} + +func (r requiresReplaceIfSourceTypeChanges) PlanModifyObject(ctx context.Context, req planmodifier.ObjectRequest, resp *planmodifier.ObjectResponse) { + // If we're creating or destroying the entire resource, don't need to check + if req.State.Raw.IsNull() || req.Plan.Raw.IsNull() { + return + } + + // Check if this source type attribute is transitioning between null and non-null + stateIsNull := req.StateValue.IsNull() + planIsNull := req.PlanValue.IsNull() + + // If transitioning from null to non-null or vice versa, this means the source type + // is changing (e.g., kafka → postgres), so require replacement + if stateIsNull != planIsNull { + resp.RequiresReplace = true + } + + // If both are non-null (values changing within same source type), no replacement needed + // If both are null (staying null), no replacement needed +} diff --git a/pkg/resource/clickpipe_cdc_infrastructure.go b/pkg/resource/clickpipe_cdc_infrastructure.go new file mode 100644 index 00000000..2ebcb039 --- /dev/null +++ b/pkg/resource/clickpipe_cdc_infrastructure.go @@ -0,0 +1,275 @@ +//go:build alpha + +package resource + +import ( + "context" + "fmt" + "time" + + "github.com/ClickHouse/terraform-provider-clickhouse/pkg/internal/api" + "github.com/ClickHouse/terraform-provider-clickhouse/pkg/resource/models" + "github.com/cenkalti/backoff/v4" + "github.com/hashicorp/terraform-plugin-framework-validators/float64validator" + "github.com/hashicorp/terraform-plugin-framework-validators/int64validator" + "github.com/hashicorp/terraform-plugin-framework/resource" + "github.com/hashicorp/terraform-plugin-framework/resource/schema" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/schema/validator" + "github.com/hashicorp/terraform-plugin-framework/types" +) + +var ( + _ resource.Resource = &ClickPipeCdcInfrastructureResource{} + _ resource.ResourceWithConfigure = &ClickPipeCdcInfrastructureResource{} +) + +func NewClickPipeCdcInfrastructureResource() resource.Resource { + return &ClickPipeCdcInfrastructureResource{} +} + +type ClickPipeCdcInfrastructureResource struct { + client api.Client +} + +func (r *ClickPipeCdcInfrastructureResource) Metadata(_ context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { + resp.TypeName = req.ProviderTypeName + "_clickpipe_cdc_infrastructure" +} + +func (r *ClickPipeCdcInfrastructureResource) Schema(_ context.Context, _ resource.SchemaRequest, resp *resource.SchemaResponse) { + resp.Schema = schema.Schema{ + MarkdownDescription: "ClickPipe CDC Infrastructure resource. Manages scaling settings for CDC ClickPipes infrastructure shared across all DB ClickPipes in a service.\n\n" + + "**Important**: Only one CDC infrastructure resource per service is supported. Creating multiple instances for the same service will cause conflicts.\n\n" + + "This endpoint becomes available once at least one DB ClickPipe has been provisioned. The resource will poll for up to 10 minutes waiting for the endpoint to become available.\n\n" + + "For billing purposes, 2 CPU cores and 8 GB of RAM correspond to one compute unit.", + Attributes: map[string]schema.Attribute{ + "service_id": schema.StringAttribute{ + MarkdownDescription: "ClickHouse Cloud service ID where the CDC infrastructure is located.", + Required: true, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.RequiresReplace(), + }, + }, + "replica_cpu_millicores": schema.Int64Attribute{ + MarkdownDescription: "CPU in millicores for DB ClickPipes. Must be a multiple of 1000, between 1000 and 24000.", + Required: true, + Validators: []validator.Int64{ + int64validator.Between(1000, 24000), + }, + }, + "replica_memory_gb": schema.Float64Attribute{ + MarkdownDescription: "Memory in GiB for DB ClickPipes. Must be a multiple of 4, between 4 and 96. Must be 4× the CPU core count.", + Required: true, + Validators: []validator.Float64{ + float64validator.Between(4, 96), + }, + }, + }, + } +} + +func (r *ClickPipeCdcInfrastructureResource) Configure(_ context.Context, req resource.ConfigureRequest, resp *resource.ConfigureResponse) { + if req.ProviderData != nil { + client, ok := req.ProviderData.(api.Client) + if !ok { + resp.Diagnostics.AddError( + "Unexpected Resource Configure Type", + fmt.Sprintf("Expected api.Client, got: %T. Please report this issue to the provider developers.", req.ProviderData), + ) + return + } + r.client = client + } +} + +func (r *ClickPipeCdcInfrastructureResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) { + var plan models.ClickPipeCdcInfrastructureModel + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } + + serviceID := plan.ServiceID.ValueString() + + // Validate CPU is a multiple of 1000 + cpuMillicores := plan.ReplicaCpuMillicores.ValueInt64() + if cpuMillicores%1000 != 0 { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_cpu_millicores must be a multiple of 1000, got %d", cpuMillicores), + ) + return + } + + // Validate memory is a multiple of 4 + memoryGb := plan.ReplicaMemoryGb.ValueFloat64() + if int(memoryGb)%4 != 0 { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_memory_gb must be a multiple of 4, got %.1f", memoryGb), + ) + return + } + + // Validate memory is 4x CPU cores + cpuCores := float64(cpuMillicores) / 1000.0 + expectedMemory := cpuCores * 4 + if memoryGb != expectedMemory { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_memory_gb must be 4× the CPU core count. With %d millicores (%.1f cores), memory must be %.1f GB, but got %.1f GB", + cpuMillicores, cpuCores, expectedMemory, memoryGb), + ) + return + } + + // Poll for CDC scaling endpoint to become available (up to 10 minutes) + maxWait := 10 * time.Minute + checkAvailable := func() error { + _, err := r.client.GetClickPipeCdcScaling(ctx, serviceID) + return err + } + + exponentialBackoff := backoff.NewExponentialBackOff( + backoff.WithMaxElapsedTime(maxWait), + backoff.WithMaxInterval(30*time.Second), + ) + + if err := backoff.Retry(checkAvailable, exponentialBackoff); err != nil { + resp.Diagnostics.AddError( + "CDC Infrastructure Not Available", + "The CDC infrastructure endpoint is not yet available after waiting 10 minutes. "+ + "This endpoint becomes available once at least one DB ClickPipe has been provisioned. "+ + "Please create a Postgres CDC ClickPipe first, then try creating this resource again.\n\n"+ + fmt.Sprintf("Error: %s", err.Error()), + ) + return + } + + // Update the scaling settings + scalingReq := api.ClickPipeCdcScalingRequest{ + ReplicaCpuMillicores: plan.ReplicaCpuMillicores.ValueInt64(), + ReplicaMemoryGb: plan.ReplicaMemoryGb.ValueFloat64(), + } + + scaling, err := r.client.UpdateClickPipeCdcScaling(ctx, serviceID, scalingReq) + if err != nil { + resp.Diagnostics.AddError( + "Error Creating CDC Infrastructure", + fmt.Sprintf("Could not create CDC infrastructure: %s", err.Error()), + ) + return + } + + // Update plan with response values + plan.ReplicaCpuMillicores = types.Int64Value(scaling.ReplicaCpuMillicores) + plan.ReplicaMemoryGb = types.Float64Value(scaling.ReplicaMemoryGb) + + resp.Diagnostics.Append(resp.State.Set(ctx, plan)...) +} + +func (r *ClickPipeCdcInfrastructureResource) Read(ctx context.Context, req resource.ReadRequest, resp *resource.ReadResponse) { + var state models.ClickPipeCdcInfrastructureModel + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + serviceID := state.ServiceID.ValueString() + + scaling, err := r.client.GetClickPipeCdcScaling(ctx, serviceID) + if err != nil { + resp.Diagnostics.AddError( + "Error Reading CDC Infrastructure", + fmt.Sprintf("Could not read CDC infrastructure: %s", err.Error()), + ) + return + } + + // Update state with API values + state.ReplicaCpuMillicores = types.Int64Value(scaling.ReplicaCpuMillicores) + state.ReplicaMemoryGb = types.Float64Value(scaling.ReplicaMemoryGb) + + resp.Diagnostics.Append(resp.State.Set(ctx, state)...) +} + +func (r *ClickPipeCdcInfrastructureResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) { + var plan models.ClickPipeCdcInfrastructureModel + resp.Diagnostics.Append(req.Plan.Get(ctx, &plan)...) + if resp.Diagnostics.HasError() { + return + } + + serviceID := plan.ServiceID.ValueString() + + // Validate CPU is a multiple of 1000 + cpuMillicores := plan.ReplicaCpuMillicores.ValueInt64() + if cpuMillicores%1000 != 0 { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_cpu_millicores must be a multiple of 1000, got %d", cpuMillicores), + ) + return + } + + // Validate memory is a multiple of 4 + memoryGb := plan.ReplicaMemoryGb.ValueFloat64() + if int(memoryGb)%4 != 0 { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_memory_gb must be a multiple of 4, got %.1f", memoryGb), + ) + return + } + + // Validate memory is 4x CPU cores + cpuCores := float64(cpuMillicores) / 1000.0 + expectedMemory := cpuCores * 4 + if memoryGb != expectedMemory { + resp.Diagnostics.AddError( + "Invalid Configuration", + fmt.Sprintf("replica_memory_gb must be 4× the CPU core count. With %d millicores (%.1f cores), memory must be %.1f GB, but got %.1f GB", + cpuMillicores, cpuCores, expectedMemory, memoryGb), + ) + return + } + + // Update the scaling settings + scalingReq := api.ClickPipeCdcScalingRequest{ + ReplicaCpuMillicores: plan.ReplicaCpuMillicores.ValueInt64(), + ReplicaMemoryGb: plan.ReplicaMemoryGb.ValueFloat64(), + } + + scaling, err := r.client.UpdateClickPipeCdcScaling(ctx, serviceID, scalingReq) + if err != nil { + resp.Diagnostics.AddError( + "Error Updating CDC Infrastructure", + fmt.Sprintf("Could not update CDC infrastructure: %s", err.Error()), + ) + return + } + + // Update plan with response values + plan.ReplicaCpuMillicores = types.Int64Value(scaling.ReplicaCpuMillicores) + plan.ReplicaMemoryGb = types.Float64Value(scaling.ReplicaMemoryGb) + + resp.Diagnostics.Append(resp.State.Set(ctx, plan)...) +} + +func (r *ClickPipeCdcInfrastructureResource) Delete(ctx context.Context, req resource.DeleteRequest, resp *resource.DeleteResponse) { + var state models.ClickPipeCdcInfrastructureModel + resp.Diagnostics.Append(req.State.Get(ctx, &state)...) + if resp.Diagnostics.HasError() { + return + } + + // CDC infrastructure is shared and cannot be explicitly deleted + // Just remove from state - the infrastructure will remain until all CDC pipes are deleted + resp.Diagnostics.AddWarning( + "CDC Infrastructure Deletion", + "CDC infrastructure is shared across all DB ClickPipes in the service and cannot be explicitly deleted. "+ + "The infrastructure will automatically be removed when all DB ClickPipes are deleted. "+ + "This resource has been removed from Terraform state only.", + ) +} diff --git a/pkg/resource/clickpipe_test.go b/pkg/resource/clickpipe_test.go index ed985fd5..ced52fcf 100644 --- a/pkg/resource/clickpipe_test.go +++ b/pkg/resource/clickpipe_test.go @@ -109,7 +109,7 @@ func TestClickPipeResource_syncClickPipeState_Postgres(t *testing.T) { Host: "postgres.example.com", Port: 5432, Database: "mydb", - Settings: api.ClickPipePostgresSettings{ + Settings: &api.ClickPipePostgresSettings{ ReplicationMode: "cdc", SyncIntervalSeconds: intPtr(60), PullBatchSize: intPtr(1000), @@ -145,7 +145,7 @@ func TestClickPipeResource_syncClickPipeState_Postgres(t *testing.T) { Host: "postgres.example.com", Port: 5432, Database: "mydb", - Settings: api.ClickPipePostgresSettings{ + Settings: &api.ClickPipePostgresSettings{ ReplicationMode: "cdc", // Optional fields not set - API may return empty/defaults PublicationName: nil, @@ -181,7 +181,7 @@ func TestClickPipeResource_syncClickPipeState_Postgres(t *testing.T) { Host: "postgres.example.com", Port: 5432, Database: "mydb", - Settings: api.ClickPipePostgresSettings{ + Settings: &api.ClickPipePostgresSettings{ ReplicationMode: "cdc", }, Mappings: []api.ClickPipePostgresTableMapping{ @@ -216,7 +216,7 @@ func TestClickPipeResource_syncClickPipeState_Postgres(t *testing.T) { Host: "postgres.example.com", Port: 5432, Database: "mydb", - Settings: api.ClickPipePostgresSettings{ + Settings: &api.ClickPipePostgresSettings{ ReplicationMode: "cdc", SyncIntervalSeconds: intPtr(120), // Changed from null to 120 }, @@ -322,7 +322,7 @@ func getPostgresInitialState() models.ClickPipeResourceModel { "enable_failover_slots": types.BoolNull(), }, ), - "table_mappings": types.ListValueMust( + "table_mappings": types.SetValueMust( models.ClickPipePostgresTableMappingModel{}.ObjectType(), []attr.Value{ types.ObjectValueMust( diff --git a/pkg/resource/models/clickpipe_resource.go b/pkg/resource/models/clickpipe_resource.go index ce536572..94d68bd1 100644 --- a/pkg/resource/models/clickpipe_resource.go +++ b/pkg/resource/models/clickpipe_resource.go @@ -337,7 +337,7 @@ type ClickPipePostgresSourceModel struct { Database types.String `tfsdk:"database"` Credentials types.Object `tfsdk:"credentials"` Settings types.Object `tfsdk:"settings"` - TableMappings types.List `tfsdk:"table_mappings"` + TableMappings types.Set `tfsdk:"table_mappings"` } func (m ClickPipePostgresSourceModel) ObjectType() types.ObjectType { @@ -348,7 +348,7 @@ func (m ClickPipePostgresSourceModel) ObjectType() types.ObjectType { "database": types.StringType, "credentials": ClickPipeSourceCredentialsModel{}.ObjectType(), "settings": ClickPipePostgresSettingsModel{}.ObjectType(), - "table_mappings": types.ListType{ElemType: ClickPipePostgresTableMappingModel{}.ObjectType()}, + "table_mappings": types.SetType{ElemType: ClickPipePostgresTableMappingModel{}.ObjectType()}, }, } } @@ -586,3 +586,9 @@ type ClickPipeResourceModel struct { Settings types.Dynamic `tfsdk:"settings"` TriggerResync types.Bool `tfsdk:"trigger_resync"` } + +type ClickPipeCdcInfrastructureModel struct { + ServiceID types.String `tfsdk:"service_id"` + ReplicaCpuMillicores types.Int64 `tfsdk:"replica_cpu_millicores"` + ReplicaMemoryGb types.Float64 `tfsdk:"replica_memory_gb"` +} diff --git a/pkg/resource/register_debug.go b/pkg/resource/register_debug.go index 9c1ced8c..219084ff 100644 --- a/pkg/resource/register_debug.go +++ b/pkg/resource/register_debug.go @@ -14,5 +14,6 @@ func GetResourceFactories() []func() upstreamresource.Resource { NewServiceTransparentDataEncryptionKeyAssociationResource, NewClickPipeResource, NewClickPipeReversePrivateEndpointResource, + NewClickPipeCdcInfrastructureResource, } }