Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-files-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink-deployments-framework": minor
---

feat(cld/runtime): add support to execute registered changesets from YAML input
141 changes: 84 additions & 57 deletions engine/cld/changeset/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Configurations struct {
type internalChangeSet interface {
noop() // unexported function to prevent arbitrary structs from implementing ChangeSet.
Apply(env fdeployment.Environment) (fdeployment.ChangesetOutput, error)
applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error)
Configurations() (Configurations, error)
}

Expand Down Expand Up @@ -77,6 +78,34 @@ type TypedJSON struct {
ChainOverrides []uint64 `json:"chainOverrides"` // Optional field for chain overrides
}

func parseTypedInput(inputStr string) (TypedJSON, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing changed here, i just refactor these logic out into common methods for reuse.

if inputStr == "" {
return TypedJSON{}, errors.New("input is empty")
}

var inputObject TypedJSON
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
return TypedJSON{}, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
}
if len(inputObject.Payload) == 0 {
return TypedJSON{}, errors.New("'payload' field is required")
}

return inputObject, nil
}

func decodePayload[C any](payload json.RawMessage) (C, error) {
var config C

payloadDecoder := json.NewDecoder(strings.NewReader(string(payload)))
payloadDecoder.DisallowUnknownFields()
if err := payloadDecoder.Decode(&config); err != nil {
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
}

return config, nil
}

// WithJSON returns a fully configured changeset, which pairs a [fdeployment.ChangeSet] with its configuration based
// a JSON input. It also allows extensions, such as a PostProcessing function.
// InputStr must be a JSON object with a "payload" field that contains the actual input data for a Durable Pipeline.
Expand All @@ -92,30 +121,13 @@ type TypedJSON struct {
// Note: Prefer WithEnvInput for durable_pipelines.go
func (f WrappedChangeSet[C]) WithJSON(_ C, inputStr string) ConfiguredChangeSet {
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
var config C

if inputStr == "" {
return config, errors.New("input is empty")
}

var inputObject TypedJSON
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
}

// If payload is null, decode it as null (which will give zero value)
// If payload is missing, return an error
if len(inputObject.Payload) == 0 {
return config, errors.New("'payload' field is required")
}

payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
payloadDecoder.DisallowUnknownFields()
if err := payloadDecoder.Decode(&config); err != nil {
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
inputObject, err := parseTypedInput(inputStr)
if err != nil {
var zero C
return zero, err
}

return config, nil
return decodePayload[C](inputObject.Payload)
},
inputChainOverrides: func() ([]uint64, error) {
return loadInputChainOverrides(inputStr)
Expand Down Expand Up @@ -151,41 +163,36 @@ func (f WrappedChangeSet[C]) WithEnvInput(opts ...EnvInputOption[C]) ConfiguredC

inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")

return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
var config C

if inputStr == "" {
return config, errors.New("input is empty")
}

var inputObject TypedJSON
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
}
providerFromInput := func(rawInput string) (C, error) {
var zero C

// If payload is null, decode it as null (which will give zero value)
// If payload is missing, return an error
if len(inputObject.Payload) == 0 {
return config, errors.New("'payload' field is required")
inputObject, err := parseTypedInput(rawInput)
if err != nil {
return zero, err
}

payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
payloadDecoder.DisallowUnknownFields()
if err := payloadDecoder.Decode(&config); err != nil {
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
config, err := decodePayload[C](inputObject.Payload)
if err != nil {
return zero, err
}

if options.inputModifier != nil {
conf, err := options.inputModifier(config)
if err != nil {
return conf, fmt.Errorf("failed to apply input modifier: %w", err)
conf, modifierErr := options.inputModifier(config)
if modifierErr != nil {
return conf, fmt.Errorf("failed to apply input modifier: %w", modifierErr)
}

return conf, nil
}

return config, nil
},
}

return ChangeSetImpl[C]{changeset: f,
configProvider: func() (C, error) {
return providerFromInput(inputStr)
},
configProviderWithInput: providerFromInput,
inputChainOverrides: func() ([]uint64, error) {
return loadInputChainOverrides(inputStr)
},
Expand Down Expand Up @@ -221,21 +228,17 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
// Read input from environment variable
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")

configProvider := func() (C, error) {
configProviderFromInput := func(rawInput string) (C, error) {
var zero C

if inputStr == "" {
if rawInput == "" {
return zero, errors.New("input is empty")
}

// Parse JSON input
var inputObject TypedJSON
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
if err := json.Unmarshal([]byte(rawInput), &inputObject); err != nil {
return zero, fmt.Errorf("failed to parse resolver input as JSON: %w", err)
}

// If payload is null, pass it to the resolver (which will receive null)
// If payload field is missing, return an error
if len(inputObject.Payload) == 0 {
return zero, errors.New("'payload' field is required")
}
Expand All @@ -249,8 +252,12 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
return typedConfig, nil
}

return ChangeSetImpl[C]{changeset: f, configProvider: configProvider,
ConfigResolver: resolver,
return ChangeSetImpl[C]{changeset: f,
configProvider: func() (C, error) {
return configProviderFromInput(inputStr)
},
configProviderWithInput: configProviderFromInput,
ConfigResolver: resolver,
inputChainOverrides: func() ([]uint64, error) {
return loadInputChainOverrides(inputStr)
},
Expand All @@ -260,9 +267,10 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
var _ ConfiguredChangeSet = ChangeSetImpl[any]{}

type ChangeSetImpl[C any] struct {
changeset WrappedChangeSet[C]
configProvider func() (C, error)
inputChainOverrides func() ([]uint64, error)
changeset WrappedChangeSet[C]
configProvider func() (C, error)
configProviderWithInput func(inputStr string) (C, error)
inputChainOverrides func() ([]uint64, error)

// Present only when the changeset was wired with
// Configure(...).WithConfigResolver(...)
Expand All @@ -287,6 +295,25 @@ func (ccs ChangeSetImpl[C]) Apply(env fdeployment.Environment) (fdeployment.Chan
return ccs.changeset.operation.Apply(env, c)
}

func (ccs ChangeSetImpl[C]) applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) {
if inputStr == "" {
return ccs.Apply(env)
}
if ccs.configProviderWithInput == nil {
return ccs.Apply(env)
}

c, err := ccs.configProviderWithInput(inputStr)
if err != nil {
return fdeployment.ChangesetOutput{}, err
}
if err := ccs.changeset.operation.VerifyPreconditions(env, c); err != nil {
return fdeployment.ChangesetOutput{}, err
}

return ccs.changeset.operation.Apply(env, c)
}

func (ccs ChangeSetImpl[C]) Configurations() (Configurations, error) {
var chainOverrides []uint64
var err error
Expand Down
12 changes: 12 additions & 0 deletions engine/cld/changeset/postprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ func (ccs PostProcessingChangeSetImpl[C]) Apply(env fdeployment.Environment) (fd
return ccs.postProcessor(env, output)
}

func (ccs PostProcessingChangeSetImpl[C]) applyWithInput(
env fdeployment.Environment, inputStr string,
) (fdeployment.ChangesetOutput, error) {
env.Logger.Debugf("Post-processing ChangesetOutput from %T", ccs.changeset.changeset.operation)
output, err := ccs.changeset.applyWithInput(env, inputStr)
if err != nil {
return output, err
}

return ccs.postProcessor(env, output)
}

func (ccs PostProcessingChangeSetImpl[C]) Configurations() (Configurations, error) {
return ccs.changeset.Configurations()
}
Expand Down
19 changes: 18 additions & 1 deletion engine/cld/changeset/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ func (r *ChangesetsRegistry) AddGlobalPostHooks(hooks ...PostHook) {
// a failed Apply are logged but never mask the Apply error.
func (r *ChangesetsRegistry) Apply(
key string, e fdeployment.Environment,
) (fdeployment.ChangesetOutput, error) {
return r.applyWithInput(key, e, "")
}

// ApplyWithInput applies a changeset with explicit input JSON for this apply invocation.
// Changesets configured to read durable pipeline input from environment variables
// consume this value directly when they support input-aware execution.
func (r *ChangesetsRegistry) ApplyWithInput(
key string, e fdeployment.Environment, inputStr string,
) (fdeployment.ChangesetOutput, error) {
return r.applyWithInput(key, e, inputStr)
}

func (r *ChangesetsRegistry) applyWithInput(
key string, e fdeployment.Environment, inputStr string,
) (fdeployment.ChangesetOutput, error) {
entry, globalPre, globalPost, err := r.getApplySnapshot(key)
if err != nil {
Expand Down Expand Up @@ -204,7 +219,9 @@ func (r *ChangesetsRegistry) Apply(
}
}

output, applyErr := entry.changeset.Apply(e)
var output fdeployment.ChangesetOutput
var applyErr error
output, applyErr = entry.changeset.applyWithInput(e, inputStr)

postParams := PostHookParams{
Env: hookEnv,
Expand Down
69 changes: 69 additions & 0 deletions engine/cld/changeset/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (noopChangeset) Apply(e fdeployment.Environment) (fdeployment.ChangesetOutp
return fdeployment.ChangesetOutput{}, nil
}

func (n noopChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
return n.Apply(e)
}

func (n noopChangeset) Configurations() (Configurations, error) {
return Configurations{
InputChainOverrides: n.chainOverrides,
Expand All @@ -45,6 +49,10 @@ func (r *recordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.Chang
return r.output, r.err
}

func (r *recordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
return r.Apply(e)
}

func (*recordingChangeset) Configurations() (Configurations, error) {
return Configurations{}, nil
}
Expand All @@ -61,6 +69,10 @@ func (o *orderRecordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.
return fdeployment.ChangesetOutput{}, nil
}

func (o *orderRecordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
return o.Apply(e)
}

func (*orderRecordingChangeset) Configurations() (Configurations, error) {
return Configurations{}, nil
}
Expand Down Expand Up @@ -148,6 +160,63 @@ func Test_Changesets_Apply(t *testing.T) {
}
}

//nolint:paralleltest // Uses process environment for fallback behavior assertions.
func Test_Changesets_ApplyWithInput_WithEnvConfiguredChangeset(t *testing.T) {
type inputConfig struct {
Value int `json:"value"`
}

t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"value":999}}`)

var received int
cs := fdeployment.CreateChangeSet(
func(_ fdeployment.Environment, cfg inputConfig) (fdeployment.ChangesetOutput, error) {
received = cfg.Value
return fdeployment.ChangesetOutput{}, nil
},
func(_ fdeployment.Environment, _ inputConfig) error { return nil },
)

r := NewChangesetsRegistry()
r.Add("0001_test", Configure(cs).WithEnvInput())

_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"value":1}}`)
require.NoError(t, err)
require.Equal(t, 1, received)
}

//nolint:paralleltest // Uses process environment for fallback behavior assertions.
func Test_Changesets_ApplyWithInput_WithResolverConfiguredChangeset(t *testing.T) {
type resolverInput struct {
Base int `json:"base"`
}
type resolverOutput struct {
Value int `json:"value"`
}

t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"base":100}}`)

resolver := func(input resolverInput) (resolverOutput, error) {
return resolverOutput{Value: input.Base + 10}, nil
}

var received int
cs := fdeployment.CreateChangeSet(
func(_ fdeployment.Environment, cfg resolverOutput) (fdeployment.ChangesetOutput, error) {
received = cfg.Value
return fdeployment.ChangesetOutput{}, nil
},
func(_ fdeployment.Environment, _ resolverOutput) error { return nil },
)

r := NewChangesetsRegistry()
r.Add("0001_test", Configure(cs).WithConfigResolver(resolver))

_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"base":7}}`)
require.NoError(t, err)
require.Equal(t, 17, received)
}

func Test_Changesets_Add(t *testing.T) {
t.Parallel()

Expand Down
Loading