Skip to content
12 changes: 9 additions & 3 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ aspectj = "1.9.22.1"
assertj = "3.27.3"
cron-utils = "9.2.1"
hikaricp = "7.0.2"
kryo = "5.6.2"
jackson = "2.21.2"
java-websocket = "1.6.0"
jaxb-api = "4.0.2"
jdbi = "3.47.0"
jooq = "3.19.15"
jspecify = "1.0.0"
junit = "6.0.3"
junit-pioneer = "2.3.0"
kotlin = "2.3.10"
kryo = "5.6.2"
logback = "1.5.32"
maven-artifact = "3.9.13"
maven-publish = "0.36.0"
Expand All @@ -20,10 +23,10 @@ postgresql = "42.7.10"
rest-assured = "6.0.0"
shadow = "9.4.1"
slf4j = "2.0.17"
sqlite-jdbc = "3.49.1.0"
spotless = "8.4.0"
spring-boot = "3.4.4"
spring-framework = "6.2.5"
sqlite-jdbc = "3.49.1.0"
system-stubs = "2.1.8"
testcontainers = "2.0.4"
versions = "0.53.0"
Expand All @@ -37,6 +40,9 @@ hikaricp = { module = "com.zaxxer:HikariCP", version.ref = "hikaricp" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
jackson-jsr310 = { module = "com.fasterxml.jackson.datatype:jackson-datatype-jsr310", version.ref = "jackson" }
java-websocket = { module = "org.java-websocket:Java-WebSocket", version.ref = "java-websocket" }
jaxb-api = { module = "jakarta.xml.bind:jakarta.xml.bind-api", version.ref = "jaxb-api" }
jdbi-core = { module = "org.jdbi:jdbi3-core", version.ref = "jdbi" }
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
jspecify = { module = "org.jspecify:jspecify", version.ref = "jspecify" }
junit-bom = { module = "org.junit:junit-bom", version.ref = "junit" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter" }
Expand All @@ -51,11 +57,11 @@ postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql"
rest-assured = { module = "io.rest-assured:rest-assured", version.ref = "rest-assured" }
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
spring-aop = { module = "org.springframework:spring-aop", version.ref = "spring-framework" }
spring-boot-autoconfigure = { module = "org.springframework.boot:spring-boot-autoconfigure", version.ref = "spring-boot" }
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "spring-boot" }
spring-boot-test = { module = "org.springframework.boot:spring-boot-test", version.ref = "spring-boot" }
sqlite-jdbc = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite-jdbc" }
system-stubs-jupiter = { module = "uk.org.webcompere:system-stubs-jupiter", version.ref = "system-stubs" }
testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }

Expand Down
8 changes: 7 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
rootProject.name = "dbos-transact-java"

include("transact", "transact-cli", "transact-spring-boot-starter")
include(
"transact",
"transact-cli",
"transact-spring-boot-starter",
"transact-jdbi-step-factory",
"transact-jooq-step-factory",
)

plugins { id("org.gradle.toolchains.foojay-resolver") version "1.0.0" }

Expand Down
22 changes: 22 additions & 0 deletions transact-jdbi-step-factory/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins { id("java-library") }

tasks.withType<JavaCompile> {
options.compilerArgs.add("-Xlint:unchecked")
options.compilerArgs.add("-Xlint:deprecation")
options.compilerArgs.add("-Xlint:rawtypes")
options.compilerArgs.add("-Werror")
}

dependencies {
api(project(":transact"))
api(libs.jdbi.core)

testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testRuntimeOnly(libs.junit.platform.launcher)

testRuntimeOnly(libs.logback.classic)
testImplementation(libs.testcontainers.postgresql)
testImplementation(libs.postgresql)
testImplementation(libs.hikaricp)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
package dev.dbos.transact.jdbi;

import dev.dbos.transact.DBOS;
import dev.dbos.transact.database.SystemDatabase;
import dev.dbos.transact.json.DBOSSerializer;
import dev.dbos.transact.json.SerializationUtil;
import dev.dbos.transact.txstep.PostgresStepFactoryHelpers;
import dev.dbos.transact.workflow.internal.StepResult;

import java.util.Objects;
import java.util.Optional;

import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.HandleCallback;
import org.jdbi.v3.core.HandleConsumer;
import org.jdbi.v3.core.Jdbi;

/**
* Runs idempotent transactional steps inside DBOS workflows using Jdbi3 {@link Handle} objects.
*
* <p>Construct one with a {@link Jdbi} instance pointing at a PostgreSQL database. The constructor
* verifies the datasource is PostgreSQL and creates the {@code tx_step_outputs} table if needed.
* Lambdas passed to {@link #inStep} or {@link #useStep} receive a {@link Handle} with a transaction
* already open; they must not call {@code commit} or {@code close} themselves.
*
* <pre>{@code
* JdbiStepFactory factory = new JdbiStepFactory(dbos, Jdbi.create(dataSource));
*
* // inside a @Workflow method:
* int count = factory.inStep(handle -> {
* return handle.createUpdate("INSERT INTO ...").execute();
* }, "myStep");
* }</pre>
*/
public class JdbiStepFactory {

private final DBOS dbos;
private final Jdbi jdbi;
private final String schema;
private final DBOSSerializer serializer;

/**
* Creates a factory using the schema and serializer from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi) {
this(dbos, jdbi, null, null);
}

/**
* Creates a factory using the given schema and the serializer from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param schema the PostgreSQL schema to use for {@code tx_step_outputs}; {@code null} uses the
* schema from {@code dbos} configuration
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema) {
this(dbos, jdbi, schema, null);
}

/**
* Creates a factory using the given serializer and the schema from {@code dbos} configuration.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param serializer the serializer to use for step outputs; {@code null} uses the serializer from
* {@code dbos} configuration
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, DBOSSerializer serializer) {
this(dbos, jdbi, null, serializer);
}

/**
* Creates a factory with explicit schema and serializer overrides.
*
* <p>Connects to the database immediately to verify it is PostgreSQL and to create the {@code
* tx_step_outputs} table in the given schema if it does not already exist.
*
* @param dbos the DBOS runtime instance
* @param jdbi a Jdbi instance connected to a PostgreSQL database
* @param schema the PostgreSQL schema to use for {@code tx_step_outputs}; {@code null} uses the
* schema from {@code dbos} configuration
* @param serializer the serializer to use for step outputs; {@code null} uses the serializer from
* {@code dbos} configuration
* @throws RuntimeException if the datasource is not PostgreSQL or the schema setup fails
*/
public JdbiStepFactory(DBOS dbos, Jdbi jdbi, String schema, DBOSSerializer serializer) {
this.dbos = dbos;
this.jdbi = jdbi;
var config = dbos.integration().config();
this.schema = SystemDatabase.sanitizeSchema(schema == null ? config.databaseSchema() : schema);
this.serializer = serializer == null ? config.serializer() : serializer;

try {
jdbi.useHandle(
handle -> {
PostgresStepFactoryHelpers.ensurePostgres(handle.getConnection());
PostgresStepFactoryHelpers.ensureSchema(handle.getConnection(), this.schema);
PostgresStepFactoryHelpers.ensureTxOutputTable(handle.getConnection(), this.schema);
});
} catch (Exception e) {
if (e instanceof RuntimeException re) {
throw re;
}
throw new RuntimeException(e.getMessage(), e);
}
}

/**
* Executes {@code callback} as an idempotent DBOS step inside a Jdbi transaction.
*
* <p>If a result for this step is already recorded (e.g. on workflow retry), the callback is
* skipped and the cached result is returned. Otherwise the callback runs inside an open
* transaction; the output is recorded atomically with the database work so the step is
* exactly-once on success.
*
* @param <R> the return type of the callback
* @param <X> the checked exception type the callback may throw
* @param callback the database work to perform; receives an open {@link Handle} and must not
* commit or close it
* @param stepName a stable name that identifies this step within the workflow
* @return the value returned by {@code callback}
* @throws X if the callback throws
*/
@SuppressWarnings("unchecked")
public <R, X extends Exception> R inStep(final HandleCallback<R, X> callback, String stepName)
throws X {

return dbos.<R, X>runStep(
() -> {
var workflowId = Objects.requireNonNull(DBOS.workflowId());
int stepId = Objects.requireNonNull(DBOS.stepId());

var prevResult = checkExecution(workflowId, stepId, stepName);
if (prevResult.isPresent()) {
return prevResult.get().<R, X>toResult(serializer);
}

try {
return jdbi.inTransaction(
h -> {
var result = callback.withHandle(h);
recordOutput(h, workflowId, stepId, result);
return result;
});
} catch (Exception e) {
recordError(workflowId, stepId, e);
throw (X) e;
}
},
stepName);
}

/**
* Executes {@code callback} as an idempotent DBOS step inside a Jdbi transaction, with no return
* value.
*
* <p>Behaves identically to {@link #inStep} but accepts a {@link HandleConsumer} for callers that
* do not need to return a result.
*
* @param <X> the checked exception type the callback may throw
* @param callback the database work to perform; receives an open {@link Handle} and must not
* commit or close it
* @param stepName a stable name that identifies this step within the workflow
* @throws X if the callback throws
*/
public <X extends Exception> void useStep(final HandleConsumer<X> callback, String stepName)
throws X {
inStep(
handle -> {
callback.useHandle(handle);
return null;
},
stepName);
}

private Optional<StepResult> checkExecution(String workflowId, int stepId, String stepName) {
var sql = PostgresStepFactoryHelpers.CHECK_SQL_TEMPLATE.formatted(schema);
return jdbi.withHandle(
h ->
h.createQuery(sql)
.bind(0, workflowId)
.bind(1, stepId)
.map(
(rs, ctx) ->
new StepResult(
workflowId,
stepId,
stepName,
rs.getString("output"),
rs.getString("error"),
null,
rs.getString("serialization")))
.findOne());
}

private <R> void recordOutput(Handle handle, String workflowId, int stepId, R result) {
var value = SerializationUtil.serializeValue(result, null, serializer);
recordResult(handle, workflowId, stepId, value.serializedValue(), null, value.serialization());
}

private <X extends Exception> void recordError(String workflowId, int stepId, X exception) {
var value = SerializationUtil.serializeError(exception, null, serializer);
jdbi.useTransaction(
h -> {
recordResult(h, workflowId, stepId, null, value.serializedValue(), value.serialization());
});
}

private void recordResult(
Handle handle,
String workflowId,
int stepId,
String output,
String error,
String serialization) {
if (output != null && error != null) {
throw new IllegalArgumentException("attempted to record non null output and error result");
}
var sql = PostgresStepFactoryHelpers.UPSERT_SQL_TEMPLATE.formatted(schema);
handle
.createUpdate(sql)
.bind(0, workflowId)
.bind(1, stepId)
.bind(2, output)
.bind(3, error)
.bind(4, serialization)
.execute();
}
}
Loading
Loading