Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2025 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.flytekit.testing;

import java.util.Objects;
import org.flyte.flytekit.SdkDynamicWorkflowTask;
import org.flyte.flytekit.SdkType;
import org.flyte.flytekit.SdkTypes;
import org.flyte.flytekit.SdkWorkflow;
import org.flyte.flytekit.SdkWorkflowBuilder;

public class SdkDynamicWorkflowTaskDelegatingWorkflow<InputT, OutputT>
extends SdkWorkflow<Void, OutputT> {
private final SdkDynamicWorkflowTask<InputT, OutputT> delegate;
private final InputT input;

public SdkDynamicWorkflowTaskDelegatingWorkflow(
SdkDynamicWorkflowTask<InputT, OutputT> delegate,
InputT input,
SdkType<OutputT> outputSdkType) {
super(SdkTypes.nulls(), outputSdkType);
this.delegate = Objects.requireNonNull(delegate, "delegate cannot be null");
this.input = input;
}

@Override
protected OutputT expand(SdkWorkflowBuilder builder, Void ignored) {
return delegate.run(builder, this.input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,41 @@ public static SdkTestingExecutor of(
.build();
}

/**
* Creates a new {@link SdkTestingExecutor} for testing a {@link SdkDynamicWorkflowTask}. This
* method wraps the given dynamic workflow task and its input into a delegating workflow, allowing
* the task to be executed and tested in isolation.
*
* @param task the dynamic workflow task to test
* @param input the input to the dynamic workflow task
* @param outputType the expected output type of the dynamic workflow task
* @param <InputT> the type of the input
* @param <OutputT> the type of the output
* @return a new {@link SdkTestingExecutor} instance
* <p>Example usage:
* <pre>{@code
* int expected = 6;
*
* SumIfEvenDynamicWorkflowTask.Output output =
* SdkTestingExecutor.of(
* new SumIfEvenDynamicWorkflowTask(),
* SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)),
* JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
* .withTaskOutput(
* new SumTask(),
* SumTask.SumInput.create(of(2), of(4)),
* SumTask.SumOutput.create(of(expected)))
* .execute()
* .getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
*
* assertEquals(expected, output.c().get());
* }</pre>
*/
public static <InputT, OutputT> SdkTestingExecutor of(
SdkDynamicWorkflowTask<InputT, OutputT> task, InputT input, SdkType<OutputT> outputType) {
return of(new SdkDynamicWorkflowTaskDelegatingWorkflow<>(task, input, outputType));
}

@AutoValue
public abstract static class Result {
abstract Map<String, Literal> literalMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2025 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.flytekit.testing;

import static org.flyte.flytekit.SdkBindingDataFactory.of;
import static org.junit.jupiter.api.Assertions.*;

import org.flyte.flytekit.jackson.JacksonSdkType;
import org.junit.jupiter.api.Test;

public class SdkDynamicWorkflowTaskDelegatingWorkflowTest {
@Test
public void testDelegatingWorkflow_EvenValues() {
int expected = 6;

SumIfEvenDynamicWorkflowTask.Output output =
SdkTestingExecutor.of(
new SumIfEvenDynamicWorkflowTask(),
SumIfEvenDynamicWorkflowTask.Input.create(of(2), of(4)),
JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
.withTaskOutput(
new SumTask(),
SumTask.SumInput.create(of(2), of(4)),
SumTask.SumOutput.create(of(expected)))
.execute()
.getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
assertEquals(expected, output.c().get());
}

@Test
public void testDelegatingWorkflow_Odd() {
int expected = 0;

SumIfEvenDynamicWorkflowTask.Output output =
SdkTestingExecutor.of(
new SumIfEvenDynamicWorkflowTask(),
SumIfEvenDynamicWorkflowTask.Input.create(of(1), of(4)),
JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class))
.withTaskOutput(
new SumTask(),
SumTask.SumInput.create(of(0), of(0)),
SumTask.SumOutput.create(of(expected)))
.execute()
.getOutputAs(JacksonSdkType.of(SumIfEvenDynamicWorkflowTask.Output.class));
assertEquals(expected, output.c().get());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2025 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.flytekit.testing;

import static org.flyte.flytekit.SdkBindingDataFactory.of;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.flyte.flytekit.*;
import org.flyte.flytekit.jackson.JacksonSdkType;

@AutoService(SumIfEvenDynamicWorkflowTask.class)
public class SumIfEvenDynamicWorkflowTask
extends SdkDynamicWorkflowTask<
SumIfEvenDynamicWorkflowTask.Input, SumIfEvenDynamicWorkflowTask.Output> {
@AutoValue
public abstract static class Input {

abstract SdkBindingData<Long> a();

abstract SdkBindingData<Long> b();

static Input create(SdkBindingData<Long> a, SdkBindingData<Long> b) {
return new AutoValue_SumIfEvenDynamicWorkflowTask_Input(a, b);
}
}

@AutoValue
public abstract static class Output {

abstract SdkBindingData<Long> c();

static Output create(SdkBindingData<Long> c) {
return new AutoValue_SumIfEvenDynamicWorkflowTask_Output(c);
}
}

public SumIfEvenDynamicWorkflowTask() {
super(JacksonSdkType.of(Input.class), JacksonSdkType.of(Output.class));
}

@Override
public Output run(SdkWorkflowBuilder builder, Input input) {
/*
* This is to demonstrate that we can use concrete values in the dynamic workflow task
*/
long aConcreteValue = input.a().get();
long bConcreteValue = input.b().get();

SumTask.SumOutput outputs =
builder
.apply(
SdkConditions.when(
"is-even",
SdkConditions.isTrue(
of(aConcreteValue % 2 == 0 && bConcreteValue % 2 == 0)),
new SumTask(),
SumTask.SumInput.create(input.a(), input.b()))
.otherwise("is-odd", new SumTask(), SumTask.SumInput.create(of(0L), of(0L))))
.getOutputs();

return Output.create(outputs.c());
}
}