diff --git a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflow.java b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflow.java new file mode 100644 index 00000000..a5117d8e --- /dev/null +++ b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflow.java @@ -0,0 +1,44 @@ +/* + * Copyright 2025 Flyte Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.flytekit.testing; + +import java.util.Objects; +import org.flyte.flytekit.SdkDynamicWorkflowTask; +import org.flyte.flytekit.SdkType; +import org.flyte.flytekit.SdkTypes; +import org.flyte.flytekit.SdkWorkflow; +import org.flyte.flytekit.SdkWorkflowBuilder; + +public class SdkDynamicWorkflowTaskDelegatingWorkflow + extends SdkWorkflow { + private final SdkDynamicWorkflowTask delegate; + private final InputT input; + + public SdkDynamicWorkflowTaskDelegatingWorkflow( + SdkDynamicWorkflowTask delegate, + InputT input, + SdkType outputSdkType) { + super(SdkTypes.nulls(), outputSdkType); + this.delegate = Objects.requireNonNull(delegate, "delegate cannot be null"); + this.input = input; + } + + @Override + protected OutputT expand(SdkWorkflowBuilder builder, Void ignored) { + return delegate.run(builder, this.input); + } +} diff --git a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java index 0b7fccb7..57e038f4 100644 --- a/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java +++ b/flytekit-testing/src/main/java/org/flyte/flytekit/testing/SdkTestingExecutor.java @@ -116,6 +116,41 @@ public static SdkTestingExecutor of( .build(); } + /** + * Creates a new {@link SdkTestingExecutor} for testing a {@link SdkDynamicWorkflowTask}. This + * method wraps the given dynamic workflow task and its input into a delegating workflow, allowing + * the task to be executed and tested in isolation. + * + * @param task the dynamic workflow task to test + * @param input the input to the dynamic workflow task + * @param outputType the expected output type of the dynamic workflow task + * @param the type of the input + * @param the type of the output + * @return a new {@link SdkTestingExecutor} instance + *

Example usage: + *

{@code
+   * int expected = 6;
+   *
+   * SumIfEvenDynamicWorkflowTask.Output output =
+   *     SdkTestingExecutor.of(
+   *             new SumIfEvenDynamicWorkflowTask(),
+   *             SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)),
+   *             JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
+   *         .withTaskOutput(
+   *             new SumTask(),
+   *             SumTask.SumInput.create(of(2), of(4)),
+   *             SumTask.SumOutput.create(of(expected)))
+   *         .execute()
+   *         .getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
+   *
+   * assertEquals(expected, output.c().get());
+   * }
+ */ + public static SdkTestingExecutor of( + SdkDynamicWorkflowTask task, InputT input, SdkType outputType) { + return of(new SdkDynamicWorkflowTaskDelegatingWorkflow<>(task, input, outputType)); + } + @AutoValue public abstract static class Result { abstract Map literalMap(); diff --git a/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflowTest.java b/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflowTest.java new file mode 100644 index 00000000..36049853 --- /dev/null +++ b/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SdkDynamicWorkflowTaskDelegatingWorkflowTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2025 Flyte Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.flytekit.testing; + +import static org.flyte.flytekit.SdkBindingDataFactory.of; +import static org.junit.jupiter.api.Assertions.*; + +import org.flyte.flytekit.jackson.JacksonSdkType; +import org.junit.jupiter.api.Test; + +public class SdkDynamicWorkflowTaskDelegatingWorkflowTest { + @Test + public void testDelegatingWorkflow_EvenValues() { + int expected = 6; + + SumIfEvenDynamicWorkflowTask.Output output = + SdkTestingExecutor.of( + new SumIfEvenDynamicWorkflowTask(), + SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)), + JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class)) + .withTaskOutput( + new SumTask(), + SumTask.SumInput.create(of(2), of(4)), + SumTask.SumOutput.create(of(expected))) + .execute() + .getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class)); + assertEquals(expected, output.c().get()); + } + + @Test + public void testDelegatingWorkflow_Odd() { + int expected = 0; + + SumIfEvenDynamicWorkflowTask.Output output = + SdkTestingExecutor.of( + new SumIfEvenDynamicWorkflowTask(), + SumIfEvenDynamicWorkflowTask.Input.create(of(1), of(4)), + JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class)) + .withTaskOutput( + new SumTask(), + SumTask.SumInput.create(of(0), of(0)), + SumTask.SumOutput.create(of(expected))) + .execute() + .getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class)); + assertEquals(expected, output.c().get()); + } +} diff --git a/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SumIfEvenDynamicWorkflowTask.java b/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SumIfEvenDynamicWorkflowTask.java new file mode 100644 index 00000000..7c08a4fa --- /dev/null +++ b/flytekit-testing/src/test/java/org/flyte/flytekit/testing/SumIfEvenDynamicWorkflowTask.java @@ -0,0 +1,78 @@ +/* + * Copyright 2025 Flyte Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.flyte.flytekit.testing; + +import static org.flyte.flytekit.SdkBindingDataFactory.of; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.flyte.flytekit.*; +import org.flyte.flytekit.jackson.JacksonSdkType; + +@AutoService(SumIfEvenDynamicWorkflowTask.class) +public class SumIfEvenDynamicWorkflowTask + extends SdkDynamicWorkflowTask< + SumIfEvenDynamicWorkflowTask.Input, SumIfEvenDynamicWorkflowTask.Output> { + @AutoValue + public abstract static class Input { + + abstract SdkBindingData a(); + + abstract SdkBindingData b(); + + static Input create(SdkBindingData a, SdkBindingData b) { + return new AutoValue_SumIfEvenDynamicWorkflowTask_Input(a, b); + } + } + + @AutoValue + public abstract static class Output { + + abstract SdkBindingData c(); + + static Output create(SdkBindingData c) { + return new AutoValue_SumIfEvenDynamicWorkflowTask_Output(c); + } + } + + public SumIfEvenDynamicWorkflowTask() { + super(JacksonSdkType.of(Input.class), JacksonSdkType.of(Output.class)); + } + + @Override + public Output run(SdkWorkflowBuilder builder, Input input) { + /* + * This is to demonstrate that we can use concrete values in the dynamic workflow task + */ + long aConcreteValue = input.a().get(); + long bConcreteValue = input.b().get(); + + SumTask.SumOutput outputs = + builder + .apply( + SdkConditions.when( + "is-even", + SdkConditions.isTrue( + of(aConcreteValue % 2 == 0 && bConcreteValue % 2 == 0)), + new SumTask(), + SumTask.SumInput.create(input.a(), input.b())) + .otherwise("is-odd", new SumTask(), SumTask.SumInput.create(of(0L), of(0L)))) + .getOutputs(); + + return Output.create(outputs.c()); + } +}