From c19f9078ec6f1cbe09a22719ebe67af4f6704d70 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 11:05:15 -0800 Subject: [PATCH 01/12] send headers --- .../java/dev/dbos/transact/conductor/Conductor.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 2dd5c236..b76d1d6d 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -49,7 +49,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.codec.http.EmptyHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; @@ -606,7 +605,7 @@ protected void initChannel(SocketChannel ch) { .version(WebSocketVersion.V13) .subprotocol(null) .allowExtensions(false) - .customHeaders(EmptyHttpHeaders.INSTANCE) + .customHeaders(createWebSocketHeaders()) .dropPongFrames(false) .handleCloseFrames(false) .maxFramePayloadLength(256 * 1024 * 1024) @@ -651,6 +650,16 @@ protected void decode( } } + private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { + io.netty.handler.codec.http.DefaultHttpHeaders headers = + new io.netty.handler.codec.http.DefaultHttpHeaders(); + + // Add standard HTTP headers that Java's HttpClient would automatically include + headers.add("User-Agent", "Java/" + System.getProperty("java.version")); + + return headers; + } + CompletableFuture getResponseAsync(BaseMessage message) { logger.debug("getResponseAsync {}", message.type); MessageType messageType = MessageType.fromValue(message.type); From cfa693db13071f3165e45ab6c539f960fa73af0d Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:12:16 -0800 Subject: [PATCH 02/12] Moar Changes --- .../dbos/transact/conductor/Conductor.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index b76d1d6d..e37ecf0f 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -155,6 +155,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc } else if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) { logger.error("Websocket handshake timeout with conductor at {}", url); + } else if (evt instanceof WebSocketClientProtocolHandler.ClientHandshakeStateEvent) { + logger.error("Websocket handshake failed with event: {}", evt); } super.userEventTriggered(ctx, evt); } @@ -343,10 +345,12 @@ public void close() throws IOException { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.warn( - "Unexpected exception in websocket connection to conductor. Channel active: {}, writable: {}", + logger.error( + "Exception in websocket connection to conductor at {}. Channel active: {}, writable: {}, Error: {}", + url, ctx.channel().isActive(), ctx.channel().isWritable(), + cause.getMessage(), cause); resetWebSocket(); } @@ -656,7 +660,28 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { // Add standard HTTP headers that Java's HttpClient would automatically include headers.add("User-Agent", "Java/" + System.getProperty("java.version")); - + + // Add more standard headers that HttpClient would include + headers.add("Accept", "*/*"); + headers.add("Accept-Language", "en-US,en;q=0.9"); + headers.add("Accept-Encoding", "gzip, deflate, br"); + + // Add Origin if connecting to cloud.dbos.dev (common WebSocket requirement) + if (url.contains("cloud.dbos.dev")) { + headers.add("Origin", "https://cloud.dbos.dev"); + logger.debug("Adding Origin header for cloud.dbos.dev connection"); + } + + // Check for DBOS-specific tokens in environment variables + String cloudToken = System.getenv("DBOS_CLOUD_TOKEN"); + if (cloudToken != null && !cloudToken.isEmpty()) { + headers.add("Authorization", "Bearer " + cloudToken); + logger.debug("Adding Authorization header from DBOS_CLOUD_TOKEN"); + } + + // Log all headers for debugging + logger.debug("WebSocket headers: {}", headers); + return headers; } From f5c7d66769b25c4d5d5c66a981d7a98c88add976 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:16:27 -0800 Subject: [PATCH 03/12] still moar --- .../dbos/transact/conductor/Conductor.java | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index e37ecf0f..d93cac47 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -666,12 +666,43 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { headers.add("Accept-Language", "en-US,en;q=0.9"); headers.add("Accept-Encoding", "gzip, deflate, br"); + // Add proper Host header + try { + URI uri = new URI(url); + String host = uri.getHost(); + if (uri.getPort() != -1 && uri.getPort() != 443 && uri.getPort() != 80) { + host += ":" + uri.getPort(); + } + headers.add("Host", host); + } catch (Exception e) { + logger.debug("Failed to extract host from URL {}", url); + } + // Add Origin if connecting to cloud.dbos.dev (common WebSocket requirement) if (url.contains("cloud.dbos.dev")) { headers.add("Origin", "https://cloud.dbos.dev"); logger.debug("Adding Origin header for cloud.dbos.dev connection"); } + // Try using the conductor key itself as authorization + // Extract conductor key from the URL path + try { + URI uri = new URI(url); + String path = uri.getPath(); + if (path != null) { + String[] pathParts = path.split("/"); + if (pathParts.length > 0) { + String conductorKey = pathParts[pathParts.length - 1]; // Last part of path + if (conductorKey.startsWith("dbos_")) { + headers.add("Authorization", "Bearer " + conductorKey); + logger.debug("Adding Authorization header with conductor key"); + } + } + } + } catch (Exception e) { + logger.debug("Failed to extract conductor key from URL {}", url); + } + // Check for DBOS-specific tokens in environment variables String cloudToken = System.getenv("DBOS_CLOUD_TOKEN"); if (cloudToken != null && !cloudToken.isEmpty()) { @@ -679,8 +710,11 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { logger.debug("Adding Authorization header from DBOS_CLOUD_TOKEN"); } - // Log all headers for debugging - logger.debug("WebSocket headers: {}", headers); + // Log all headers for debugging (but hide auth values) + logger.debug("WebSocket headers: {}", + headers.names().stream() + .map(name -> name + "=" + (name.toLowerCase().contains("auth") ? "[REDACTED]" : headers.get(name))) + .collect(java.util.stream.Collectors.joining(", "))); return headers; } From 500b79d45370aac8e71041576a85c569d8657b98 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:18:42 -0800 Subject: [PATCH 04/12] remove auth --- .../dbos/transact/conductor/Conductor.java | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index d93cac47..94c91741 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -684,32 +684,18 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { logger.debug("Adding Origin header for cloud.dbos.dev connection"); } - // Try using the conductor key itself as authorization - // Extract conductor key from the URL path - try { - URI uri = new URI(url); - String path = uri.getPath(); - if (path != null) { - String[] pathParts = path.split("/"); - if (pathParts.length > 0) { - String conductorKey = pathParts[pathParts.length - 1]; // Last part of path - if (conductorKey.startsWith("dbos_")) { - headers.add("Authorization", "Bearer " + conductorKey); - logger.debug("Adding Authorization header with conductor key"); - } - } - } - } catch (Exception e) { - logger.debug("Failed to extract conductor key from URL {}", url); - } - - // Check for DBOS-specific tokens in environment variables + // Check for explicit DBOS cloud tokens in environment variables only String cloudToken = System.getenv("DBOS_CLOUD_TOKEN"); if (cloudToken != null && !cloudToken.isEmpty()) { headers.add("Authorization", "Bearer " + cloudToken); logger.debug("Adding Authorization header from DBOS_CLOUD_TOKEN"); } + // Add Connection and Upgrade headers that browsers typically send + headers.add("Connection", "Upgrade"); + headers.add("Upgrade", "websocket"); + headers.add("Sec-WebSocket-Version", "13"); + // Log all headers for debugging (but hide auth values) logger.debug("WebSocket headers: {}", headers.names().stream() From e40f48a0859e7534de8595e3a04a5789a81ae687 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:21:21 -0800 Subject: [PATCH 05/12] moar --- .../dbos/transact/conductor/Conductor.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 94c91741..c22b16d0 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -603,6 +603,23 @@ protected void initChannel(SocketChannel ch) { p.addLast( new HttpClientCodec(), new HttpObjectAggregator(256 * 1024 * 1024), // 256MB max message size + new io.netty.channel.ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof io.netty.handler.codec.http.FullHttpResponse) { + io.netty.handler.codec.http.FullHttpResponse response = + (io.netty.handler.codec.http.FullHttpResponse) msg; + logger.debug("HTTP Response: {} {}", + response.status().code(), response.status().reasonPhrase()); + logger.debug("HTTP Response headers: {}", response.headers()); + if (response.content().readableBytes() > 0) { + String body = response.content().toString(java.nio.charset.StandardCharsets.UTF_8); + logger.debug("HTTP Response body: {}", body); + } + } + super.channelRead(ctx, msg); + } + }, new WebSocketClientProtocolHandler( WebSocketClientProtocolConfig.newBuilder() .webSocketUri(uri) @@ -658,15 +675,10 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { io.netty.handler.codec.http.DefaultHttpHeaders headers = new io.netty.handler.codec.http.DefaultHttpHeaders(); - // Add standard HTTP headers that Java's HttpClient would automatically include + // Add only the most basic headers that Java's HttpClient would send headers.add("User-Agent", "Java/" + System.getProperty("java.version")); - // Add more standard headers that HttpClient would include - headers.add("Accept", "*/*"); - headers.add("Accept-Language", "en-US,en;q=0.9"); - headers.add("Accept-Encoding", "gzip, deflate, br"); - - // Add proper Host header + // Add proper Host header (this is critical) try { URI uri = new URI(url); String host = uri.getHost(); @@ -678,12 +690,6 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { logger.debug("Failed to extract host from URL {}", url); } - // Add Origin if connecting to cloud.dbos.dev (common WebSocket requirement) - if (url.contains("cloud.dbos.dev")) { - headers.add("Origin", "https://cloud.dbos.dev"); - logger.debug("Adding Origin header for cloud.dbos.dev connection"); - } - // Check for explicit DBOS cloud tokens in environment variables only String cloudToken = System.getenv("DBOS_CLOUD_TOKEN"); if (cloudToken != null && !cloudToken.isEmpty()) { @@ -691,11 +697,6 @@ private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { logger.debug("Adding Authorization header from DBOS_CLOUD_TOKEN"); } - // Add Connection and Upgrade headers that browsers typically send - headers.add("Connection", "Upgrade"); - headers.add("Upgrade", "websocket"); - headers.add("Sec-WebSocket-Version", "13"); - // Log all headers for debugging (but hide auth values) logger.debug("WebSocket headers: {}", headers.names().stream() From 9a4d9022c2c7452be83346ae204e155f6c75afec Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:25:19 -0800 Subject: [PATCH 06/12] revert conductor for debugging --- .../dbos/transact/conductor/Conductor.java | 1116 +++++------------ 1 file changed, 341 insertions(+), 775 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index c22b16d0..74c3b7dc 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -4,7 +4,6 @@ import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.json.JSONUtil; -import dev.dbos.transact.workflow.ExportedWorkflow; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.StepInfo; @@ -12,19 +11,19 @@ import dev.dbos.transact.workflow.WorkflowStatus; import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; -import java.util.Base64; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.net.http.WebSocket.Listener; +import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -32,55 +31,22 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.stream.Collectors; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - -import com.fasterxml.jackson.core.type.TypeReference; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.MessageToMessageDecoder; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; -import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; -import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import io.netty.handler.codec.json.JsonObjectDecoder; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Conductor implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(Conductor.class); - private static final Map< - MessageType, BiFunction>> + private static final Map> dispatchMap; static { - Map>> map = + Map> map = new java.util.EnumMap<>(MessageType.class); map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo); map.put(MessageType.RECOVERY, Conductor::handleRecovery); map.put(MessageType.CANCEL, Conductor::handleCancel); - map.put(MessageType.DELETE, Conductor::handleDelete); map.put(MessageType.RESUME, Conductor::handleResume); map.put(MessageType.RESTART, Conductor::handleRestart); map.put(MessageType.FORK_WORKFLOW, Conductor::handleFork); @@ -91,15 +57,13 @@ public class Conductor implements AutoCloseable { map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow); map.put(MessageType.RETENTION, Conductor::handleRetention); map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics); - map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow); - map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow); - dispatchMap = Collections.unmodifiableMap(map); } private final int pingPeriodMs; private final int pingTimeoutMs; private final int reconnectDelayMs; + private final int connectTimeoutMs; private final String url; private final SystemDatabase systemDatabase; @@ -107,9 +71,7 @@ public class Conductor implements AutoCloseable { private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicBoolean isShutdown = new AtomicBoolean(false); - private Channel channel; - private EventLoopGroup group; - private NettyWebSocketHandler handler; + private WebSocket webSocket; private ScheduledFuture pingInterval; private ScheduledFuture pingTimeout; private ScheduledFuture reconnectTimeout; @@ -144,228 +106,7 @@ private Conductor(Builder builder) { this.pingPeriodMs = builder.pingPeriodMs; this.pingTimeoutMs = builder.pingTimeoutMs; this.reconnectDelayMs = builder.reconnectDelayMs; - } - - private class NettyWebSocketHandler extends SimpleChannelInboundHandler { - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { - logger.info("Successfully established websocket connection to DBOS conductor at {}", url); - setPingInterval(ctx.channel()); - } else if (evt - == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) { - logger.error("Websocket handshake timeout with conductor at {}", url); - } else if (evt instanceof WebSocketClientProtocolHandler.ClientHandshakeStateEvent) { - logger.error("Websocket handshake failed with event: {}", evt); - } - super.userEventTriggered(ctx, evt); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof PingWebSocketFrame ping) { - logger.debug("Received ping from conductor"); - ctx.channel().writeAndFlush(new PongWebSocketFrame(ping.content().retain())); - } else if (msg instanceof PongWebSocketFrame) { - logger.debug("Received pong from conductor"); - if (pingTimeout != null) { - pingTimeout.cancel(false); - pingTimeout = null; - logger.debug("Cancelled ping timeout - connection is healthy"); - } else { - logger.debug("Received pong but no ping timeout was active"); - } - } else if (msg instanceof CloseWebSocketFrame closeFrame) { - logger.warn( - "Received close frame from conductor: status={}, reason='{}'", - closeFrame.statusCode(), - closeFrame.reasonText()); - if (isShutdown.get()) { - logger.debug("Shutdown Conductor connection"); - } else if (reconnectTimeout == null) { - logger.warn("onClose: Connection to conductor lost. Reconnecting"); - resetWebSocket(); - } - } else if (msg instanceof ByteBuf content) { - int messageSize = content.readableBytes(); - logger.debug("Received {} bytes from Conductor {}", messageSize, msg.getClass().getName()); - - BaseMessage request; - try (InputStream is = new ByteBufInputStream(content)) { - request = JSONUtil.fromJson(is, BaseMessage.class); - } catch (Exception e) { - logger.error("Conductor JSON Parsing error for {} byte message", messageSize, e); - return; - } - - try { - long startTime = System.currentTimeMillis(); - logger.info( - "Processing conductor request: type={}, id={}", request.type, request.request_id); - - getResponseAsync(request) - .whenComplete( - (response, throwable) -> { - try { - long processingTime = System.currentTimeMillis() - startTime; - if (throwable != null) { - logger.error( - "Error processing request: type={}, id={}, duration={}ms", - request.type, - request.request_id, - processingTime, - throwable); - - // Create an error response - BaseResponse errorResponse = - new BaseResponse( - request.type, request.request_id, throwable.getMessage()); - writeFragmentedResponse(ctx, errorResponse); - } else { - logger.info( - "Completed processing request: type={}, id={}, duration={}ms", - request.type, - request.request_id, - processingTime); - writeFragmentedResponse(ctx, response); - } - } catch (Exception e) { - logger.error( - "Error writing response for request type={}, id={}", - request.type, - request.request_id, - e); - } - }); - } catch (Exception e) { - logger.error( - "Conductor Response error for request type={}, id={}", - request.type, - request.request_id, - e); - } - } - } - - private static void writeFragmentedResponse(ChannelHandlerContext ctx, BaseResponse response) - throws Exception { - int fragmentSize = 128 * 1024; // 128k - logger.debug( - "Starting to write fragmented response: type={}, id={}", - response.type, - response.request_id); - try (OutputStream out = new FragmentingOutputStream(ctx, fragmentSize)) { - JSONUtil.toJsonStream(response, out); - } - logger.debug( - "Completed writing fragmented response: type={}, id={}", - response.type, - response.request_id); - } - - private static class FragmentingOutputStream extends OutputStream { - private final ChannelHandlerContext ctx; - private final int fragmentSize; - private ByteBuf currentBuffer; - private boolean firstFrame = true; - private boolean closed = false; - - public FragmentingOutputStream(ChannelHandlerContext ctx, int fragmentSize) { - this.ctx = ctx; - this.fragmentSize = fragmentSize; - this.currentBuffer = ctx.alloc().buffer(fragmentSize); - logger.debug("Created FragmentingOutputStream with fragment size: {}", fragmentSize); - } - - @Override - public void write(int b) throws IOException { - currentBuffer.writeByte(b); - if (currentBuffer.readableBytes() == fragmentSize) { - flushBuffer(false); - } - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - while (len > 0) { - int toCopy = Math.min(len, fragmentSize - currentBuffer.readableBytes()); - currentBuffer.writeBytes(b, off, toCopy); - off += toCopy; - len -= toCopy; - if (currentBuffer.readableBytes() == fragmentSize) { - flushBuffer(false); - } - } - } - - private void flushBuffer(boolean last) { - if (currentBuffer.readableBytes() == 0 && !last) { - return; - } - - int frameSize = currentBuffer.readableBytes(); - WebSocketFrame frame; - if (firstFrame) { - frame = new TextWebSocketFrame(last, 0, currentBuffer); - firstFrame = false; - } else { - frame = new ContinuationWebSocketFrame(last, 0, currentBuffer); - } - - try { - ctx.channel() - .writeAndFlush(frame) - .addListener( - future -> { - if (!future.isSuccess()) { - logger.error( - "Failed to send websocket frame: {} bytes", frameSize, future.cause()); - } - }); - } catch (Exception e) { - logger.error("Exception while sending websocket frame: {} bytes", frameSize, e); - throw e; - } - - if (!last) { - currentBuffer = ctx.alloc().buffer(fragmentSize); - } else { - currentBuffer = null; - } - } - - @Override - public void close() throws IOException { - if (!closed) { - flushBuffer(true); - closed = true; - } - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - logger.error( - "Exception in websocket connection to conductor at {}. Channel active: {}, writable: {}, Error: {}", - url, - ctx.channel().isActive(), - ctx.channel().isWritable(), - cause.getMessage(), - cause); - resetWebSocket(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) { - logger.warn( - "Websocket channel became inactive. Shutdown: {}, reconnect pending: {}", - isShutdown.get(), - reconnectTimeout != null); - if (!isShutdown.get() && reconnectTimeout == null) { - logger.warn("Channel inactive: Connection to conductor lost. Reconnecting"); - resetWebSocket(); - } - } + this.connectTimeoutMs = builder.connectTimeoutMs; } public static class Builder { @@ -376,6 +117,7 @@ public static class Builder { private int pingPeriodMs = 20000; private int pingTimeoutMs = 15000; private int reconnectDelayMs = 1000; + private int connectTimeoutMs = 5000; public Builder(DBOSExecutor e, SystemDatabase s, String key) { systemDatabase = s; @@ -404,6 +146,11 @@ Builder reconnectDelayMs(int reconnectDelayMs) { return this; } + Builder connectTimeoutMs(int connectTimeoutMs) { + this.connectTimeoutMs = connectTimeoutMs; + return this; + } + public Conductor build() { return new Conductor(this); } @@ -416,7 +163,7 @@ public void close() { public void start() { logger.debug("start"); - connectWebSocket(); + dispatchLoop(); } public void stop() { @@ -434,18 +181,14 @@ public void stop() { scheduler.shutdownNow(); - if (channel != null) { - channel.close(); - channel = null; - } - if (group != null) { - group.shutdownGracefully(); - group = null; + if (webSocket != null) { + webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""); + webSocket = null; } } } - void setPingInterval(Channel channel) { + void setPingInterval(WebSocket webSocket) { logger.debug("setPingInterval"); if (pingInterval != null) { @@ -458,29 +201,32 @@ void setPingInterval(Channel channel) { return; } try { - if (channel == null || !channel.isActive()) { - logger.debug("channel not active, NOT sending ping to conductor"); + // Check for null in case webSocket connects before webSocket variable is assigned + if (webSocket == null) { + logger.debug("webSocket null, NOT sending ping to conductor"); return; } - logger.debug("Sending ping to conductor (timeout in {}ms)", pingTimeoutMs); - channel - .writeAndFlush(new PingWebSocketFrame()) - .addListener( - future -> { - if (!future.isSuccess()) { - logger.error("Failed to send ping to conductor", future.cause()); - resetWebSocket(); - } + if (webSocket.isOutputClosed()) { + logger.debug("webSocket closed, NOT sending ping to conductor"); + return; + } + + logger.debug("Sending ping to conductor"); + webSocket + .sendPing(ByteBuffer.allocate(0)) + .exceptionally( + ex -> { + logger.error("Failed to send ping to conductor", ex); + resetWebSocket(); + return null; }); pingTimeout = scheduler.schedule( () -> { if (!isShutdown.get()) { - logger.error( - "Ping timeout after {}ms - no pong received from conductor. Connection lost, reconnecting.", - pingTimeoutMs); + logger.warn("pingTimeout: Connection to conductor lost. Reconnecting."); resetWebSocket(); } }, @@ -496,10 +242,6 @@ void setPingInterval(Channel channel) { } void resetWebSocket() { - logger.info( - "Resetting websocket connection. Channel active: {}", - channel != null ? channel.isActive() : "null"); - if (pingInterval != null) { pingInterval.cancel(false); pingInterval = null; @@ -510,556 +252,380 @@ void resetWebSocket() { pingTimeout = null; } - if (channel != null) { - channel.close(); - channel = null; - } - - if (group != null) { - group.shutdownGracefully(); - group = null; + if (webSocket != null) { + webSocket.abort(); + webSocket = null; } if (isShutdown.get()) { - logger.debug("Not scheduling reconnection - conductor is shutting down"); return; } if (reconnectTimeout == null) { - logger.info("Scheduling websocket reconnection in {}ms", reconnectDelayMs); reconnectTimeout = scheduler.schedule( () -> { reconnectTimeout = null; - logger.info("Attempting websocket reconnection"); - connectWebSocket(); + dispatchLoop(); }, reconnectDelayMs, TimeUnit.MILLISECONDS); - } else { - logger.debug("Reconnection already scheduled"); } } - void connectWebSocket() { - if (channel != null) { - logger.warn("Conductor channel already exists"); + void dispatchLoop() { + if (webSocket != null) { + logger.warn("Conductor websocket already exists"); return; } if (isShutdown.get()) { - logger.debug("Not connecting web socket as conductor is shutting down"); + logger.debug("Not starting dispatch loop as conductor is shutting down"); return; } + // Log environment variables that might affect authentication + logger.debug("===== Environment Check ====="); + logger.debug("DBOS_DOMAIN: {}", System.getenv("DBOS_DOMAIN")); + logger.debug("DBOS__CLOUD: {}", System.getenv("DBOS__CLOUD")); + logger.debug("DBOS__APPID: {}", System.getenv("DBOS__APPID")); + logger.debug("DBOS__VMID: {}", System.getenv("DBOS__VMID")); + logger.debug("DBOS__APPVERSION: {}", System.getenv("DBOS__APPVERSION")); + + // Check for any DBOS or auth-related environment variables + System.getenv().entrySet().stream() + .filter(e -> e.getKey().toUpperCase().contains("DBOS") || + e.getKey().toUpperCase().contains("AUTH") || + e.getKey().toUpperCase().contains("TOKEN")) + .forEach(e -> logger.debug("Auth env: {}={}", e.getKey(), + e.getKey().toLowerCase().contains("token") || e.getKey().toLowerCase().contains("password") ? + "[REDACTED]" : e.getValue())); + logger.debug("===== Environment Check Complete ====="); + try { logger.debug("Connecting to conductor at {}", url); - URI uri = new URI(url); - String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); - final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); - final int port; - if (uri.getPort() == -1) { - if ("ws".equalsIgnoreCase(scheme)) { - port = 80; - } else if ("wss".equalsIgnoreCase(scheme)) { - port = 443; - } else { - port = -1; - } - } else { - port = uri.getPort(); - } - if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { - logger.error("Only WS(S) is supported."); - return; - } + // Create HttpClient with debugging + HttpClient client = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .build(); + + // Log the URL being used + URI wsUri = URI.create(url); + logger.debug("WebSocket URI: {}", wsUri); + logger.debug("URI scheme: {}, host: {}, port: {}, path: {}", + wsUri.getScheme(), wsUri.getHost(), wsUri.getPort(), wsUri.getPath()); + + // Create a WebSocket builder and log its configuration + WebSocket.Builder wsBuilder = client.newWebSocketBuilder() + .connectTimeout(Duration.ofMillis(connectTimeoutMs)); + + logger.debug("WebSocket builder timeout: {}ms", connectTimeoutMs); + + // Add custom headers to see if we can capture what gets sent + // Try to add headers that might be automatically added + logger.debug("Adding debug headers to WebSocket request"); + + webSocket = + wsBuilder + .header("User-Agent", "DBOS-Java-Debug/" + System.getProperty("java.version")) + // Add a custom header to see if headers work at all + .header("X-Debug-Test", "test-value") + .buildAsync( + wsUri, + new WebSocket.Listener() { + @Override + public void onOpen(WebSocket webSocket) { + logger.info("===== SUCCESSFUL WebSocket Connection ====="); + logger.info("Connected to DBOS conductor at: {}", url); + logger.info("WebSocket state: {}", webSocket); + logger.info("WebSocket subprotocol: {}", webSocket.getSubprotocol()); + + // Try to capture any available connection details + logger.debug("WebSocket class: {}", webSocket.getClass().getName()); + + webSocket.request(1); + setPingInterval(webSocket); + logger.info("===== WebSocket setup complete ====="); + } - final boolean ssl = "wss".equalsIgnoreCase(scheme); - final SslContext sslCtx; - if (ssl) { - sslCtx = - SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .build(); - } else { - sslCtx = null; - } + @Override + public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { + logger.debug("Received pong from conductor"); + webSocket.request(1); + if (pingTimeout != null) { + pingTimeout.cancel(false); + pingTimeout = null; + } + return null; + } - group = new NioEventLoopGroup(); - handler = new NettyWebSocketHandler(); - - Bootstrap b = new Bootstrap(); - b.group(group) - .channel(NioSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) { - var p = ch.pipeline(); - if (sslCtx != null) { - p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); - } - p.addLast( - new HttpClientCodec(), - new HttpObjectAggregator(256 * 1024 * 1024), // 256MB max message size - new io.netty.channel.ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof io.netty.handler.codec.http.FullHttpResponse) { - io.netty.handler.codec.http.FullHttpResponse response = - (io.netty.handler.codec.http.FullHttpResponse) msg; - logger.debug("HTTP Response: {} {}", - response.status().code(), response.status().reasonPhrase()); - logger.debug("HTTP Response headers: {}", response.headers()); - if (response.content().readableBytes() > 0) { - String body = response.content().toString(java.nio.charset.StandardCharsets.UTF_8); - logger.debug("HTTP Response body: {}", body); - } - } - super.channelRead(ctx, msg); - } - }, - new WebSocketClientProtocolHandler( - WebSocketClientProtocolConfig.newBuilder() - .webSocketUri(uri) - .version(WebSocketVersion.V13) - .subprotocol(null) - .allowExtensions(false) - .customHeaders(createWebSocketHeaders()) - .dropPongFrames(false) - .handleCloseFrames(false) - .maxFramePayloadLength(256 * 1024 * 1024) - .build()), - new MessageToMessageDecoder() { - @Override - protected void decode( - ChannelHandlerContext ctx, WebSocketFrame frame, List out) { - if (frame instanceof TextWebSocketFrame - || frame instanceof ContinuationWebSocketFrame) { - out.add(frame.content().retain()); - } else { - out.add(frame.retain()); - } - } - }, - new JsonObjectDecoder(256 * 1024 * 1024) { - { - setCumulator(COMPOSITE_CUMULATOR); - } - }, - handler); - } - }); - - ChannelFuture future = b.connect(host, port); - channel = future.channel(); - future.addListener( - f -> { - if (f.isSuccess()) { - logger.info("Successfully connected to conductor at {}:{}", host, port); - } else { - logger.warn( - "Failed to connect to conductor at {}:{}. Reconnecting", host, port, f.cause()); - resetWebSocket(); - } - }); + @Override + public CompletionStage onClose( + WebSocket webSocket, int statusCode, String reason) { + if (isShutdown.get()) { + logger.debug("Shutdown Conductor connection"); + } else if (reconnectTimeout == null) { + logger.warn("onClose: Connection to conductor lost. Reconnecting"); + resetWebSocket(); + } + return Listener.super.onClose(webSocket, statusCode, reason); + } - } catch (Exception e) { - logger.warn("Error in conductor loop. Reconnecting", e); - resetWebSocket(); - } - } + @Override + public void onError(WebSocket webSocket, Throwable error) { + logger.error("===== WebSocket Connection Error ====="); + logger.error("Error type: {}", error.getClass().getName()); + logger.error("Error message: {}", error.getMessage()); + logger.error("URL: {}", url); + logger.error("WebSocket state: {}", webSocket); + logger.error("Full error:", error); + logger.error("===== End WebSocket Error ====="); + resetWebSocket(); + } - private io.netty.handler.codec.http.HttpHeaders createWebSocketHeaders() { - io.netty.handler.codec.http.DefaultHttpHeaders headers = - new io.netty.handler.codec.http.DefaultHttpHeaders(); + @Override + public CompletionStage onText( + WebSocket webSocket, CharSequence data, boolean last) { + BaseMessage request; + webSocket.request(1); + try { + request = JSONUtil.fromJson(data.toString(), BaseMessage.class); + } catch (Exception e) { + logger.error("Conductor JSON Parsing error", e); + return CompletableFuture.completedStage(null); + } - // Add only the most basic headers that Java's HttpClient would send - headers.add("User-Agent", "Java/" + System.getProperty("java.version")); - - // Add proper Host header (this is critical) - try { - URI uri = new URI(url); - String host = uri.getHost(); - if (uri.getPort() != -1 && uri.getPort() != 443 && uri.getPort() != 80) { - host += ":" + uri.getPort(); + String responseText; + try { + BaseResponse response = getResponse(request); + responseText = JSONUtil.toJson(response); + } catch (Exception e) { + logger.error("Conductor Response error", e); + return CompletableFuture.completedStage(null); + } + + return webSocket + .sendText(responseText, true) + .exceptionally( + ex -> { + logger.error("Conductor sendText error", ex); + return null; + }); + } + }) + .join(); + + logger.info("===== WebSocket buildAsync completed ====="); + logger.info("WebSocket created successfully: {}", webSocket != null); + if (webSocket != null) { + logger.info("WebSocket class: {}", webSocket.getClass().getName()); + logger.info("WebSocket toString: {}", webSocket); } - headers.add("Host", host); + logger.info("===== WebSocket creation complete ====="); + } catch (Exception e) { - logger.debug("Failed to extract host from URL {}", url); - } - - // Check for explicit DBOS cloud tokens in environment variables only - String cloudToken = System.getenv("DBOS_CLOUD_TOKEN"); - if (cloudToken != null && !cloudToken.isEmpty()) { - headers.add("Authorization", "Bearer " + cloudToken); - logger.debug("Adding Authorization header from DBOS_CLOUD_TOKEN"); + logger.error("===== Error in conductor loop ====="); + logger.error("Exception type: {}", e.getClass().getName()); + logger.error("Exception message: {}", e.getMessage()); + logger.error("Full exception:", e); + logger.error("===== End conductor loop error ====="); + resetWebSocket(); } - - // Log all headers for debugging (but hide auth values) - logger.debug("WebSocket headers: {}", - headers.names().stream() - .map(name -> name + "=" + (name.toLowerCase().contains("auth") ? "[REDACTED]" : headers.get(name))) - .collect(java.util.stream.Collectors.joining(", "))); - - return headers; } - CompletableFuture getResponseAsync(BaseMessage message) { - logger.debug("getResponseAsync {}", message.type); + BaseResponse getResponse(BaseMessage message) { + logger.debug("getResponse {}", message.type); MessageType messageType = MessageType.fromValue(message.type); - BiFunction> func = - dispatchMap.get(messageType); + BiFunction func = dispatchMap.get(messageType); if (func != null) { return func.apply(this, message); } else { logger.warn("Conductor unknown message type {}", message.type); - return CompletableFuture.completedFuture( - new BaseResponse(message.type, message.request_id, "Unknown message type")); + return new BaseResponse(message.type, message.request_id, "Unknown message type"); } } - static CompletableFuture handleExecutorInfo( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - try { - String hostname = InetAddress.getLocalHost().getHostName(); - return new ExecutorInfoResponse( - message, - conductor.dbosExecutor.executorId(), - conductor.dbosExecutor.appVersion(), - hostname); - } catch (Exception e) { - return new ExecutorInfoResponse(message, e); - } - }); - } - - static CompletableFuture handleRecovery(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - RecoveryRequest request = (RecoveryRequest) message; - try { - conductor.dbosExecutor.recoverPendingWorkflows(request.executor_ids); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when recovering pending workflows", e); - return new SuccessResponse(request, e); - } - }); + static BaseResponse handleExecutorInfo(Conductor conductor, BaseMessage message) { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + return new ExecutorInfoResponse( + message, + conductor.dbosExecutor.executorId(), + conductor.dbosExecutor.appVersion(), + hostname); + } catch (Exception e) { + return new ExecutorInfoResponse(message, e); + } } - static CompletableFuture handleCancel(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - CancelRequest request = (CancelRequest) message; - try { - conductor.dbosExecutor.cancelWorkflow(request.workflow_id); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error( - "Exception encountered when cancelling workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } - }); + static BaseResponse handleRecovery(Conductor conductor, BaseMessage message) { + RecoveryRequest request = (RecoveryRequest) message; + try { + conductor.dbosExecutor.recoverPendingWorkflows(request.executor_ids); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when recovering pending workflows", e); + return new SuccessResponse(request, e); + } } - static CompletableFuture handleDelete(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - DeleteRequest request = (DeleteRequest) message; - try { - conductor.dbosExecutor.deleteWorkflow(request.workflow_id, request.delete_children); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when deleting workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } - }); + static BaseResponse handleCancel(Conductor conductor, BaseMessage message) { + CancelRequest request = (CancelRequest) message; + try { + conductor.dbosExecutor.cancelWorkflow(request.workflow_id); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when cancelling workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } } - static CompletableFuture handleResume(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ResumeRequest request = (ResumeRequest) message; - try { - conductor.dbosExecutor.resumeWorkflow(request.workflow_id); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when resuming workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } - }); + static BaseResponse handleResume(Conductor conductor, BaseMessage message) { + ResumeRequest request = (ResumeRequest) message; + try { + conductor.dbosExecutor.resumeWorkflow(request.workflow_id); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when resuming workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } } - static CompletableFuture handleRestart(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - RestartRequest request = (RestartRequest) message; - try { - ForkOptions options = new ForkOptions(); - conductor.dbosExecutor.forkWorkflow(request.workflow_id, 0, options); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error( - "Exception encountered when restarting workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } - }); + static BaseResponse handleRestart(Conductor conductor, BaseMessage message) { + RestartRequest request = (RestartRequest) message; + try { + ForkOptions options = new ForkOptions(); + conductor.dbosExecutor.forkWorkflow(request.workflow_id, 0, options); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when restarting workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } } - static CompletableFuture handleFork(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ForkWorkflowRequest request = (ForkWorkflowRequest) message; - if (request.body.workflow_id == null || request.body.start_step == null) { - return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request"); - } - try { - var options = - new ForkOptions( - request.body.new_workflow_id, request.body.application_version, null); - WorkflowHandle handle = - conductor.dbosExecutor.forkWorkflow( - request.body.workflow_id, request.body.start_step, options); - return new ForkWorkflowResponse(request, handle.workflowId()); - } catch (Exception e) { - logger.error("Exception encountered when forking workflow {}", request, e); - return new ForkWorkflowResponse(request, e); - } - }); + static BaseResponse handleFork(Conductor conductor, BaseMessage message) { + ForkWorkflowRequest request = (ForkWorkflowRequest) message; + if (request.body.workflow_id == null || request.body.start_step == null) { + return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request"); + } + try { + var options = + new ForkOptions(request.body.new_workflow_id, request.body.application_version, null); + WorkflowHandle handle = + conductor.dbosExecutor.forkWorkflow( + request.body.workflow_id, request.body.start_step, options); + return new ForkWorkflowResponse(request, handle.workflowId()); + } catch (Exception e) { + logger.error("Exception encountered when forking workflow {}", request, e); + return new ForkWorkflowResponse(request, e); + } } - static CompletableFuture handleListWorkflows( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ListWorkflowsRequest request = (ListWorkflowsRequest) message; - try { - ListWorkflowsInput input = request.asInput(); - List statuses = conductor.dbosExecutor.listWorkflows(input); - List output = - statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); - return new WorkflowOutputsResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when listing workflows", e); - return new WorkflowOutputsResponse(request, e); - } - }); + static BaseResponse handleListWorkflows(Conductor conductor, BaseMessage message) { + ListWorkflowsRequest request = (ListWorkflowsRequest) message; + try { + ListWorkflowsInput input = request.asInput(); + List statuses = conductor.dbosExecutor.listWorkflows(input); + List output = + statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); + return new WorkflowOutputsResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when listing workflows", e); + return new WorkflowOutputsResponse(request, e); + } } - static CompletableFuture handleListQueuedWorkflows( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ListQueuedWorkflowsRequest request = (ListQueuedWorkflowsRequest) message; - try { - ListWorkflowsInput input = request.asInput(); - List statuses = conductor.dbosExecutor.listWorkflows(input); - List output = - statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); - return new WorkflowOutputsResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when listing workflows", e); - return new WorkflowOutputsResponse(request, e); - } - }); + static BaseResponse handleListQueuedWorkflows(Conductor conductor, BaseMessage message) { + ListQueuedWorkflowsRequest request = (ListQueuedWorkflowsRequest) message; + try { + ListWorkflowsInput input = request.asInput(); + List statuses = conductor.dbosExecutor.listWorkflows(input); + List output = + statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); + return new WorkflowOutputsResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when listing workflows", e); + return new WorkflowOutputsResponse(request, e); + } } - static CompletableFuture handleListSteps(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ListStepsRequest request = (ListStepsRequest) message; - try { - List stepInfoList = - conductor.dbosExecutor.listWorkflowSteps(request.workflow_id); - List steps = - stepInfoList.stream() - .map(i -> new ListStepsResponse.Step(i)) - .collect(Collectors.toList()); - return new ListStepsResponse(request, steps); - } catch (Exception e) { - logger.error("Exception encountered when listing steps {}", request.workflow_id, e); - return new ListStepsResponse(request, e); - } - }); + static BaseResponse handleListSteps(Conductor conductor, BaseMessage message) { + ListStepsRequest request = (ListStepsRequest) message; + try { + List stepInfoList = conductor.dbosExecutor.listWorkflowSteps(request.workflow_id); + List steps = + stepInfoList.stream() + .map(i -> new ListStepsResponse.Step(i)) + .collect(Collectors.toList()); + return new ListStepsResponse(request, steps); + } catch (Exception e) { + logger.error("Exception encountered when listing steps {}", request.workflow_id, e); + return new ListStepsResponse(request, e); + } } - static CompletableFuture handleExistPendingWorkflows( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; - try { - List pending = - conductor.systemDatabase.getPendingWorkflows( - request.executor_id, request.application_version); - return new ExistPendingWorkflowsResponse(request, pending.size() > 0); - } catch (Exception e) { - logger.error("Exception encountered when checking for pending workflows", e); - return new ExistPendingWorkflowsResponse(request, e); - } - }); + static BaseResponse handleExistPendingWorkflows(Conductor conductor, BaseMessage message) { + ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; + try { + List pending = + conductor.systemDatabase.getPendingWorkflows( + request.executor_id, request.application_version); + return new ExistPendingWorkflowsResponse(request, pending.size() > 0); + } catch (Exception e) { + logger.error("Exception encountered when checking for pending workflows", e); + return new ExistPendingWorkflowsResponse(request, e); + } } - static CompletableFuture handleGetWorkflow( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - GetWorkflowRequest request = (GetWorkflowRequest) message; - try { - var status = conductor.systemDatabase.getWorkflowStatus(request.workflow_id); - WorkflowsOutput output = status == null ? null : new WorkflowsOutput(status); - return new GetWorkflowResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when getting workflow {}", request.workflow_id, e); - return new GetWorkflowResponse(request, e); - } - }); + static BaseResponse handleGetWorkflow(Conductor conductor, BaseMessage message) { + GetWorkflowRequest request = (GetWorkflowRequest) message; + try { + var status = conductor.systemDatabase.getWorkflowStatus(request.workflow_id); + WorkflowsOutput output = status == null ? null : new WorkflowsOutput(status); + return new GetWorkflowResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when getting workflow {}", request.workflow_id, e); + return new GetWorkflowResponse(request, e); + } } - static CompletableFuture handleRetention(Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - RetentionRequest request = (RetentionRequest) message; - - try { - conductor.systemDatabase.garbageCollect( - request.body.gc_cutoff_epoch_ms, request.body.gc_rows_threshold); - } catch (Exception e) { - logger.error("Exception encountered garbage collecting system database", e); - return new SuccessResponse(request, e); - } - - try { - if (request.body.timeout_cutoff_epoch_ms != null) { - conductor.dbosExecutor.globalTimeout(request.body.timeout_cutoff_epoch_ms); - } - } catch (Exception e) { - logger.error("Exception encountered setting global timeout", e); - return new SuccessResponse(request, e); - } - - return new SuccessResponse(request, true); - }); - } + static BaseResponse handleRetention(Conductor conductor, BaseMessage message) { + RetentionRequest request = (RetentionRequest) message; - static CompletableFuture handleGetMetrics( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - GetMetricsRequest request = (GetMetricsRequest) message; - - try { - if (request.metric_class.equals("workflow_step_count")) { - var metrics = - conductor.systemDatabase.getMetrics(request.startTime(), request.endTime()); - return new GetMetricsResponse(request, metrics); - } else { - logger.warn("Unexpected metric class {}", request.metric_class); - throw new RuntimeException( - "Unexpected metric class %s".formatted(request.metric_class)); - } - } catch (Exception e) { - return new GetMetricsResponse(request, e); - } - }); - } + try { + conductor.systemDatabase.garbageCollect( + request.body.gc_cutoff_epoch_ms, request.body.gc_rows_threshold); + } catch (Exception e) { + logger.error("Exception encountered garbage collecting system database", e); + return new SuccessResponse(request, e); + } - static CompletableFuture handleImportWorkflow( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ImportWorkflowRequest request = (ImportWorkflowRequest) message; - long startTime = System.currentTimeMillis(); - logger.info("Starting import workflow"); - - try { - var exportedWorkflows = deserializeExportedWorkflows(request.serialized_workflow); - logger.info("deserialization completed workflow count={}", exportedWorkflows.size()); - conductor.systemDatabase.importWorkflow(exportedWorkflows); - long duration = System.currentTimeMillis() - startTime; - logger.info( - "Database import completed: {} workflows imported, duration={}ms", - exportedWorkflows.size(), - duration); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when importing workflow", e); - return new SuccessResponse(request, e); - } - }); - } + try { + if (request.body.timeout_cutoff_epoch_ms != null) { + conductor.dbosExecutor.globalTimeout(request.body.timeout_cutoff_epoch_ms); + } + } catch (Exception e) { + logger.error("Exception encountered setting global timeout", e); + return new SuccessResponse(request, e); + } - static CompletableFuture handleExportWorkflow( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ExportWorkflowRequest request = (ExportWorkflowRequest) message; - long startTime = System.currentTimeMillis(); - logger.info( - "Starting export workflow: id={}, export_children={}", - request.workflow_id, - request.export_children); - - try { - var workflows = - conductor.systemDatabase.exportWorkflow( - request.workflow_id, request.export_children); - - logger.info( - "Database export completed: workflow_id={}, {} workflows retrieved", - request.workflow_id, - workflows.size()); - - var serializedWorkflow = serializeExportedWorkflows(workflows); - - long duration = System.currentTimeMillis() - startTime; - logger.info( - "Export workflow completed: id={}, workflows={}, serialized_size={} bytes, duration={}ms", - request.workflow_id, - workflows.size(), - serializedWorkflow.length(), - duration); - - return new ExportWorkflowResponse(message, serializedWorkflow); - } catch (Exception e) { - long duration = System.currentTimeMillis() - startTime; - var children = request.export_children ? "with children" : ""; - logger.error( - "Exception encountered when exporting workflow {} {} after {}ms", - request.workflow_id, - children, - duration, - e); - return new ExportWorkflowResponse(request, e); - } finally { - long totalDuration = System.currentTimeMillis() - startTime; - logger.info( - "handleExportWorkflow completed: id={}, total_duration={}ms", - request.workflow_id, - totalDuration); - } - }); + return new SuccessResponse(request, true); } - static List deserializeExportedWorkflows(String serializedWorkflow) - throws IOException { - var compressed = Base64.getDecoder().decode(serializedWorkflow); - try (var gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { - var typeRef = new TypeReference>() {}; - return JSONUtil.fromJson(gis, typeRef); - } - } + static BaseResponse handleGetMetrics(Conductor conductor, BaseMessage message) { + GetMetricsRequest request = (GetMetricsRequest) message; - static String serializeExportedWorkflows(List workflows) throws IOException { - var out = new ByteArrayOutputStream(); - try (var gOut = new GZIPOutputStream(out)) { - JSONUtil.toJson(gOut, workflows); + try { + if (request.metric_class.equals("workflow_step_count")) { + var metrics = conductor.systemDatabase.getMetrics(request.startTime(), request.endTime()); + return new GetMetricsResponse(request, metrics); + } else { + logger.warn("Unexpected metric class {}", request.metric_class); + throw new RuntimeException("Unexpected metric class %s".formatted(request.metric_class)); + } + } catch (Exception e) { + return new GetMetricsResponse(request, e); } - - return Base64.getEncoder().encodeToString(out.toByteArray()); } } From 4d46be4abb92e81816491b4c1eb1c88cc990b6a5 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:27:01 -0800 Subject: [PATCH 07/12] moar reverted --- .../src/main/java/dev/dbos/transact/conductor/Conductor.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 74c3b7dc..4f37aede 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -292,6 +292,10 @@ void dispatchLoop() { logger.debug("DBOS__VMID: {}", System.getenv("DBOS__VMID")); logger.debug("DBOS__APPVERSION: {}", System.getenv("DBOS__APPVERSION")); + // Enable Java HTTP client debugging to see exact headers sent + System.setProperty("jdk.httpclient.HttpClient.log", "all"); + System.setProperty("java.net.useSystemProxies", "true"); + // Check for any DBOS or auth-related environment variables System.getenv().entrySet().stream() .filter(e -> e.getKey().toUpperCase().contains("DBOS") || From d5e05230c47d9e21668025a83902f6739671fdcf Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:36:22 -0800 Subject: [PATCH 08/12] back to netty --- .../dbos/transact/conductor/Conductor.java | 463 ++++++++++++------ 1 file changed, 317 insertions(+), 146 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 4f37aede..c150f1ba 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -4,6 +4,7 @@ import dev.dbos.transact.database.SystemDatabase; import dev.dbos.transact.execution.DBOSExecutor; import dev.dbos.transact.json.JSONUtil; +import dev.dbos.transact.workflow.ExportedWorkflow; import dev.dbos.transact.workflow.ForkOptions; import dev.dbos.transact.workflow.ListWorkflowsInput; import dev.dbos.transact.workflow.StepInfo; @@ -11,19 +12,17 @@ import dev.dbos.transact.workflow.WorkflowStatus; import dev.dbos.transact.workflow.internal.GetPendingWorkflowsOutput; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.net.InetAddress; import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.WebSocket; -import java.net.http.WebSocket.Listener; -import java.nio.ByteBuffer; -import java.time.Duration; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -31,7 +30,35 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.stream.Collectors; - +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import com.fasterxml.jackson.core.type.TypeReference; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +98,8 @@ public class Conductor implements AutoCloseable { private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicBoolean isShutdown = new AtomicBoolean(false); - private WebSocket webSocket; + private Channel channel; + private EventLoopGroup group; private ScheduledFuture pingInterval; private ScheduledFuture pingTimeout; private ScheduledFuture reconnectTimeout; @@ -181,14 +209,20 @@ public void stop() { scheduler.shutdownNow(); - if (webSocket != null) { - webSocket.sendClose(WebSocket.NORMAL_CLOSURE, ""); - webSocket = null; + if (channel != null && channel.isActive()) { + channel.writeAndFlush(new CloseWebSocketFrame()); + channel.close(); + channel = null; + } + + if (group != null) { + group.shutdownGracefully(); + group = null; } } } - void setPingInterval(WebSocket webSocket) { + void setPingInterval(Channel channel) { logger.debug("setPingInterval"); if (pingInterval != null) { @@ -201,25 +235,26 @@ void setPingInterval(WebSocket webSocket) { return; } try { - // Check for null in case webSocket connects before webSocket variable is assigned - if (webSocket == null) { - logger.debug("webSocket null, NOT sending ping to conductor"); + // Check for null in case channel connects before channel variable is assigned + if (channel == null) { + logger.debug("channel null, NOT sending ping to conductor"); return; } - if (webSocket.isOutputClosed()) { - logger.debug("webSocket closed, NOT sending ping to conductor"); + if (!channel.isActive()) { + logger.debug("channel closed, NOT sending ping to conductor"); return; } logger.debug("Sending ping to conductor"); - webSocket - .sendPing(ByteBuffer.allocate(0)) - .exceptionally( - ex -> { - logger.error("Failed to send ping to conductor", ex); - resetWebSocket(); - return null; + channel + .writeAndFlush(new PingWebSocketFrame(Unpooled.buffer(0))) + .addListener( + future -> { + if (!future.isSuccess()) { + logger.error("Failed to send ping to conductor", future.cause()); + resetWebSocket(); + } }); pingTimeout = @@ -252,9 +287,14 @@ void resetWebSocket() { pingTimeout = null; } - if (webSocket != null) { - webSocket.abort(); - webSocket = null; + if (channel != null) { + channel.close(); + channel = null; + } + + if (group != null) { + group.shutdownGracefully(); + group = null; } if (isShutdown.get()) { @@ -274,7 +314,7 @@ void resetWebSocket() { } void dispatchLoop() { - if (webSocket != null) { + if (channel != null) { logger.warn("Conductor websocket already exists"); return; } @@ -291,146 +331,197 @@ void dispatchLoop() { logger.debug("DBOS__APPID: {}", System.getenv("DBOS__APPID")); logger.debug("DBOS__VMID: {}", System.getenv("DBOS__VMID")); logger.debug("DBOS__APPVERSION: {}", System.getenv("DBOS__APPVERSION")); - - // Enable Java HTTP client debugging to see exact headers sent - System.setProperty("jdk.httpclient.HttpClient.log", "all"); - System.setProperty("java.net.useSystemProxies", "true"); - + // Check for any DBOS or auth-related environment variables System.getenv().entrySet().stream() - .filter(e -> e.getKey().toUpperCase().contains("DBOS") || - e.getKey().toUpperCase().contains("AUTH") || - e.getKey().toUpperCase().contains("TOKEN")) - .forEach(e -> logger.debug("Auth env: {}={}", e.getKey(), - e.getKey().toLowerCase().contains("token") || e.getKey().toLowerCase().contains("password") ? - "[REDACTED]" : e.getValue())); + .filter( + e -> + e.getKey().toUpperCase().contains("DBOS") + || e.getKey().toUpperCase().contains("AUTH") + || e.getKey().toUpperCase().contains("TOKEN")) + .forEach( + e -> + logger.debug( + "Auth env: {}={}", + e.getKey(), + e.getKey().toLowerCase().contains("token") + || e.getKey().toLowerCase().contains("password") + ? "[REDACTED]" + : e.getValue())); logger.debug("===== Environment Check Complete ====="); try { logger.debug("Connecting to conductor at {}", url); - // Create HttpClient with debugging - HttpClient client = HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_2) - .build(); - // Log the URL being used URI wsUri = URI.create(url); logger.debug("WebSocket URI: {}", wsUri); - logger.debug("URI scheme: {}, host: {}, port: {}, path: {}", - wsUri.getScheme(), wsUri.getHost(), wsUri.getPort(), wsUri.getPath()); - - // Create a WebSocket builder and log its configuration - WebSocket.Builder wsBuilder = client.newWebSocketBuilder() - .connectTimeout(Duration.ofMillis(connectTimeoutMs)); - - logger.debug("WebSocket builder timeout: {}ms", connectTimeoutMs); - - // Add custom headers to see if we can capture what gets sent - // Try to add headers that might be automatically added - logger.debug("Adding debug headers to WebSocket request"); - - webSocket = - wsBuilder - .header("User-Agent", "DBOS-Java-Debug/" + System.getProperty("java.version")) - // Add a custom header to see if headers work at all - .header("X-Debug-Test", "test-value") - .buildAsync( - wsUri, - new WebSocket.Listener() { - @Override - public void onOpen(WebSocket webSocket) { - logger.info("===== SUCCESSFUL WebSocket Connection ====="); - logger.info("Connected to DBOS conductor at: {}", url); - logger.info("WebSocket state: {}", webSocket); - logger.info("WebSocket subprotocol: {}", webSocket.getSubprotocol()); - - // Try to capture any available connection details - logger.debug("WebSocket class: {}", webSocket.getClass().getName()); - - webSocket.request(1); - setPingInterval(webSocket); - logger.info("===== WebSocket setup complete ====="); - } + logger.debug( + "URI scheme: {}, host: {}, port: {}, path: {}", + wsUri.getScheme(), + wsUri.getHost(), + wsUri.getPort(), + wsUri.getPath()); - @Override - public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { - logger.debug("Received pong from conductor"); - webSocket.request(1); - if (pingTimeout != null) { - pingTimeout.cancel(false); - pingTimeout = null; - } - return null; - } + // Create event loop group + group = new NioEventLoopGroup(); - @Override - public CompletionStage onClose( - WebSocket webSocket, int statusCode, String reason) { - if (isShutdown.get()) { - logger.debug("Shutdown Conductor connection"); - } else if (reconnectTimeout == null) { - logger.warn("onClose: Connection to conductor lost. Reconnecting"); - resetWebSocket(); - } - return Listener.super.onClose(webSocket, statusCode, reason); - } + // Create SSL context for WSS + SslContext sslCtx = + SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); - @Override - public void onError(WebSocket webSocket, Throwable error) { - logger.error("===== WebSocket Connection Error ====="); - logger.error("Error type: {}", error.getClass().getName()); - logger.error("Error message: {}", error.getMessage()); - logger.error("URL: {}", url); - logger.error("WebSocket state: {}", webSocket); - logger.error("Full error:", error); - logger.error("===== End WebSocket Error ====="); - resetWebSocket(); - } + // Create headers that EXACTLY match the working HttpClient implementation + DefaultHttpHeaders headers = new DefaultHttpHeaders(); + headers.add("User-Agent", "DBOS-Java-Debug/" + System.getProperty("java.version")); + headers.add("X-Debug-Test", "test-value"); + logger.debug("WebSocket builder timeout: {}ms", connectTimeoutMs); + logger.debug("Adding debug headers to WebSocket request"); + + // Set up the WebSocket handshaker with the exact same parameters + int port = + wsUri.getPort() == -1 ? (wsUri.getScheme().equals("wss") ? 443 : 80) : wsUri.getPort(); + WebSocketClientHandshaker handshaker = + WebSocketClientHandshakerFactory.newHandshaker( + wsUri, + WebSocketVersion.V13, + null, // No subprotocol + true, // Allow extensions + headers, + 65536); // Max frame payload length + + // Bootstrap the client + Bootstrap bootstrap = new Bootstrap(); + final Conductor conductor = this; + + ChannelFuture future = + bootstrap + .group(group) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { @Override - public CompletionStage onText( - WebSocket webSocket, CharSequence data, boolean last) { - BaseMessage request; - webSocket.request(1); - try { - request = JSONUtil.fromJson(data.toString(), BaseMessage.class); - } catch (Exception e) { - logger.error("Conductor JSON Parsing error", e); - return CompletableFuture.completedStage(null); - } - - String responseText; - try { - BaseResponse response = getResponse(request); - responseText = JSONUtil.toJson(response); - } catch (Exception e) { - logger.error("Conductor Response error", e); - return CompletableFuture.completedStage(null); - } - - return webSocket - .sendText(responseText, true) - .exceptionally( - ex -> { - logger.error("Conductor sendText error", ex); - return null; - }); + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc(), wsUri.getHost(), port)); + pipeline.addLast("http-codec", new HttpClientCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + pipeline.addLast( + "websocket-handler", + new SimpleChannelInboundHandler() { + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + handshaker.handshake(ctx.channel()); + super.channelActive(ctx); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) + throws Exception { + Channel ch = ctx.channel(); + + if (!handshaker.isHandshakeComplete()) { + try { + handshaker.finishHandshake( + ch, (io.netty.handler.codec.http.FullHttpResponse) msg); + logger.info("===== SUCCESSFUL WebSocket Connection ====="); + logger.info("Connected to DBOS conductor at: {}", url); + logger.info("Channel state: {}", ch); + logger.debug("Channel class: {}", ch.getClass().getName()); + + setPingInterval(ch); + logger.info("===== WebSocket setup complete ====="); + return; + } catch (Exception e) { + logger.error("WebSocket handshake failed:", e); + return; + } + } + + if (msg instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) msg; + String text = textFrame.text(); + + BaseMessage request; + try { + request = JSONUtil.fromJson(text, BaseMessage.class); + } catch (Exception e) { + logger.error("Conductor JSON Parsing error", e); + return; + } + + String responseText; + try { + BaseResponse response = getResponse(request); + responseText = JSONUtil.toJson(response); + } catch (Exception e) { + logger.error("Conductor Response error", e); + return; + } + + ch.writeAndFlush(new TextWebSocketFrame(responseText)); + } else if (msg instanceof PongWebSocketFrame) { + logger.debug("Received pong from conductor"); + if (pingTimeout != null) { + pingTimeout.cancel(false); + pingTimeout = null; + } + } else if (msg instanceof CloseWebSocketFrame) { + if (isShutdown.get()) { + logger.debug("Shutdown Conductor connection"); + } else if (reconnectTimeout == null) { + logger.warn( + "onClose: Connection to conductor lost. Reconnecting"); + resetWebSocket(); + } + ch.close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + logger.error("===== WebSocket Connection Error ====="); + logger.error("Error type: {}", cause.getClass().getName()); + logger.error("Error message: {}", cause.getMessage()); + logger.error("URL: {}", url); + logger.error("Channel state: {}", ctx.channel()); + logger.error("Full error:", cause); + logger.error("===== End WebSocket Error ====="); + resetWebSocket(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) + throws Exception { + if (isShutdown.get()) { + logger.debug("Shutdown Conductor connection"); + } else if (reconnectTimeout == null) { + logger.warn( + "channelInactive: Connection to conductor lost. Reconnecting"); + resetWebSocket(); + } + super.channelInactive(ctx); + } + }); } }) - .join(); - + .connect(wsUri.getHost(), port); + + channel = future.sync().channel(); + logger.info("===== WebSocket buildAsync completed ====="); - logger.info("WebSocket created successfully: {}", webSocket != null); - if (webSocket != null) { - logger.info("WebSocket class: {}", webSocket.getClass().getName()); - logger.info("WebSocket toString: {}", webSocket); + logger.info("WebSocket created successfully: {}", channel != null); + if (channel != null) { + logger.info("WebSocket class: {}", channel.getClass().getName()); + logger.info("WebSocket toString: {}", channel); } logger.info("===== WebSocket creation complete ====="); - + } catch (Exception e) { logger.error("===== Error in conductor loop ====="); - logger.error("Exception type: {}", e.getClass().getName()); + logger.error("Exception type: {}", e.getClass().getName()); logger.error("Exception message: {}", e.getMessage()); logger.error("Full exception:", e); logger.error("===== End conductor loop error ====="); @@ -632,4 +723,84 @@ static BaseResponse handleGetMetrics(Conductor conductor, BaseMessage message) { return new GetMetricsResponse(request, e); } } + + static CompletableFuture handleExportWorkflow( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ExportWorkflowRequest request = (ExportWorkflowRequest) message; + long startTime = System.currentTimeMillis(); + try { + var workflows = + conductor.systemDatabase.exportWorkflow( + request.workflow_id, request.export_children); + logger.info("Queried database workflow count={}", workflows.size()); + + var serializedWorkflow = serializeExportedWorkflows(workflows); + + long duration = System.currentTimeMillis() - startTime; + logger.info( + "Export workflow completed: id={}, workflows={}, serialized_size={} bytes, duration={}ms", + request.workflow_id, + workflows.size(), + serializedWorkflow.length(), + duration); + + return new ExportWorkflowResponse(message, serializedWorkflow); + } catch (Exception e) { + long duration = System.currentTimeMillis() - startTime; + var children = request.export_children ? "with children" : ""; + logger.error( + "Exception encountered when exporting workflow {} {} after {}ms", + request.workflow_id, + children, + duration, + e); + return new ExportWorkflowResponse(request, e); + } + }); + } + + static CompletableFuture handleImportWorkflow( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ImportWorkflowRequest request = (ImportWorkflowRequest) message; + long startTime = System.currentTimeMillis(); + try { + logger.info("Starting workflow import"); + var exportedWorkflows = deserializeExportedWorkflows(request.serialized_workflow); + logger.info("deserialization completed workflow count={}", exportedWorkflows.size()); + conductor.systemDatabase.importWorkflow(exportedWorkflows); + + long duration = System.currentTimeMillis() - startTime; + logger.info( + "Database import completed: {} workflows imported, duration={}ms", + exportedWorkflows.size(), + duration); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when importing workflow", e); + return new SuccessResponse(request, e); + } + }); + } + + static List deserializeExportedWorkflows(String serializedWorkflow) + throws IOException { + var compressed = Base64.getDecoder().decode(serializedWorkflow); + try (var gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { + var typeRef = new TypeReference>() {}; + return JSONUtil.fromJson(gis, typeRef); + } + } + + static String serializeExportedWorkflows(List workflows) throws IOException { + var out = new ByteArrayOutputStream(); + try (var gOut = new GZIPOutputStream(out)) { + JSONUtil.toJson(gOut, workflows); + } + + return Base64.getEncoder().encodeToString(out.toByteArray()); + } } From 401c1c3ea5ede37421128dd49ba57ca1c83e5e01 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:38:45 -0800 Subject: [PATCH 09/12] still moar --- .../dbos/transact/conductor/Conductor.java | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index c150f1ba..3dd995be 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -406,6 +406,25 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc(), wsUri.getHost(), port)); pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + + // Add HTTP request/response logger + pipeline.addLast("http-logger", new io.netty.channel.ChannelDuplexHandler() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, io.netty.channel.ChannelPromise promise) throws Exception { + if (msg instanceof io.netty.handler.codec.http.HttpRequest) { + io.netty.handler.codec.http.HttpRequest request = (io.netty.handler.codec.http.HttpRequest) msg; + logger.error("===== HTTP Request Debug ====="); + logger.error("HTTP Request: {} {} {}", + request.method(), request.uri(), request.protocolVersion()); + logger.error("HTTP Request Headers:"); + request.headers().forEach(header -> + logger.error(" {}: {}", header.getKey(), header.getValue())); + logger.error("===== End HTTP Request Debug ====="); + } + super.write(ctx, msg, promise); + } + }); + pipeline.addLast( "websocket-handler", new SimpleChannelInboundHandler() { @@ -423,8 +442,18 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) if (!handshaker.isHandshakeComplete()) { try { - handshaker.finishHandshake( - ch, (io.netty.handler.codec.http.FullHttpResponse) msg); + // Log the HTTP response we received + io.netty.handler.codec.http.FullHttpResponse response = + (io.netty.handler.codec.http.FullHttpResponse) msg; + logger.error("===== HTTP Response Debug ====="); + logger.error("HTTP Response Status: {} {}", + response.status().code(), response.status().reasonPhrase()); + logger.error("HTTP Response Headers:"); + response.headers().forEach(header -> + logger.error(" {}: {}", header.getKey(), header.getValue())); + logger.error("===== End HTTP Response Debug ====="); + + handshaker.finishHandshake(ch, response); logger.info("===== SUCCESSFUL WebSocket Connection ====="); logger.info("Connected to DBOS conductor at: {}", url); logger.info("Channel state: {}", ch); From 8264554f66f82f520b1db2f20ded53a3e347208f Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:45:04 -0800 Subject: [PATCH 10/12] remove origin header --- .../main/java/dev/dbos/transact/conductor/Conductor.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 3dd995be..1f9e5322 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -407,12 +407,17 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast("http-codec", new HttpClientCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); - // Add HTTP request/response logger + // Add HTTP request/response logger + Origin header remover pipeline.addLast("http-logger", new io.netty.channel.ChannelDuplexHandler() { @Override public void write(ChannelHandlerContext ctx, Object msg, io.netty.channel.ChannelPromise promise) throws Exception { if (msg instanceof io.netty.handler.codec.http.HttpRequest) { io.netty.handler.codec.http.HttpRequest request = (io.netty.handler.codec.http.HttpRequest) msg; + + // Remove the Origin header that Netty automatically adds + request.headers().remove("origin"); + request.headers().remove("Origin"); // try both cases + logger.error("===== HTTP Request Debug ====="); logger.error("HTTP Request: {} {} {}", request.method(), request.uri(), request.protocolVersion()); From f7433fc2917d9b3f4405e45d140dc22d0de85c0e Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 14:49:28 -0800 Subject: [PATCH 11/12] back to original w/ just header removal --- .../dbos/transact/conductor/Conductor.java | 1055 ++++++++++------- 1 file changed, 619 insertions(+), 436 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index 1f9e5322..ac167a02 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -15,6 +15,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; import java.util.Base64; @@ -35,27 +37,31 @@ import com.fasterxml.jackson.core.type.TypeReference; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.handler.codec.http.EmptyHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolConfig; +import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -65,15 +71,17 @@ public class Conductor implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(Conductor.class); - private static final Map> + private static final Map< + MessageType, BiFunction>> dispatchMap; static { - Map> map = + Map>> map = new java.util.EnumMap<>(MessageType.class); map.put(MessageType.EXECUTOR_INFO, Conductor::handleExecutorInfo); map.put(MessageType.RECOVERY, Conductor::handleRecovery); map.put(MessageType.CANCEL, Conductor::handleCancel); + map.put(MessageType.DELETE, Conductor::handleDelete); map.put(MessageType.RESUME, Conductor::handleResume); map.put(MessageType.RESTART, Conductor::handleRestart); map.put(MessageType.FORK_WORKFLOW, Conductor::handleFork); @@ -84,13 +92,15 @@ public class Conductor implements AutoCloseable { map.put(MessageType.GET_WORKFLOW, Conductor::handleGetWorkflow); map.put(MessageType.RETENTION, Conductor::handleRetention); map.put(MessageType.GET_METRICS, Conductor::handleGetMetrics); + map.put(MessageType.IMPORT_WORKFLOW, Conductor::handleImportWorkflow); + map.put(MessageType.EXPORT_WORKFLOW, Conductor::handleExportWorkflow); + dispatchMap = Collections.unmodifiableMap(map); } private final int pingPeriodMs; private final int pingTimeoutMs; private final int reconnectDelayMs; - private final int connectTimeoutMs; private final String url; private final SystemDatabase systemDatabase; @@ -100,6 +110,7 @@ public class Conductor implements AutoCloseable { private Channel channel; private EventLoopGroup group; + private NettyWebSocketHandler handler; private ScheduledFuture pingInterval; private ScheduledFuture pingTimeout; private ScheduledFuture reconnectTimeout; @@ -134,7 +145,224 @@ private Conductor(Builder builder) { this.pingPeriodMs = builder.pingPeriodMs; this.pingTimeoutMs = builder.pingTimeoutMs; this.reconnectDelayMs = builder.reconnectDelayMs; - this.connectTimeoutMs = builder.connectTimeoutMs; + } + + private class NettyWebSocketHandler extends SimpleChannelInboundHandler { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { + logger.info("Successfully established websocket connection to DBOS conductor at {}", url); + setPingInterval(ctx.channel()); + } else if (evt + == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) { + logger.error("Websocket handshake timeout with conductor at {}", url); + } + super.userEventTriggered(ctx, evt); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof PingWebSocketFrame ping) { + logger.debug("Received ping from conductor"); + ctx.channel().writeAndFlush(new PongWebSocketFrame(ping.content().retain())); + } else if (msg instanceof PongWebSocketFrame) { + logger.debug("Received pong from conductor"); + if (pingTimeout != null) { + pingTimeout.cancel(false); + pingTimeout = null; + logger.debug("Cancelled ping timeout - connection is healthy"); + } else { + logger.debug("Received pong but no ping timeout was active"); + } + } else if (msg instanceof CloseWebSocketFrame closeFrame) { + logger.warn( + "Received close frame from conductor: status={}, reason='{}'", + closeFrame.statusCode(), + closeFrame.reasonText()); + if (isShutdown.get()) { + logger.debug("Shutdown Conductor connection"); + } else if (reconnectTimeout == null) { + logger.warn("onClose: Connection to conductor lost. Reconnecting"); + resetWebSocket(); + } + } else if (msg instanceof ByteBuf content) { + int messageSize = content.readableBytes(); + logger.debug("Received {} bytes from Conductor {}", messageSize, msg.getClass().getName()); + + BaseMessage request; + try (InputStream is = new ByteBufInputStream(content)) { + request = JSONUtil.fromJson(is, BaseMessage.class); + } catch (Exception e) { + logger.error("Conductor JSON Parsing error for {} byte message", messageSize, e); + return; + } + + try { + long startTime = System.currentTimeMillis(); + logger.info( + "Processing conductor request: type={}, id={}", request.type, request.request_id); + + getResponseAsync(request) + .whenComplete( + (response, throwable) -> { + try { + long processingTime = System.currentTimeMillis() - startTime; + if (throwable != null) { + logger.error( + "Error processing request: type={}, id={}, duration={}ms", + request.type, + request.request_id, + processingTime, + throwable); + + // Create an error response + BaseResponse errorResponse = + new BaseResponse( + request.type, request.request_id, throwable.getMessage()); + writeFragmentedResponse(ctx, errorResponse); + } else { + logger.info( + "Completed processing request: type={}, id={}, duration={}ms", + request.type, + request.request_id, + processingTime); + writeFragmentedResponse(ctx, response); + } + } catch (Exception e) { + logger.error( + "Error writing response for request type={}, id={}", + request.type, + request.request_id, + e); + } + }); + } catch (Exception e) { + logger.error( + "Conductor Response error for request type={}, id={}", + request.type, + request.request_id, + e); + } + } + } + + private static void writeFragmentedResponse(ChannelHandlerContext ctx, BaseResponse response) + throws Exception { + int fragmentSize = 128 * 1024; // 128k + logger.debug( + "Starting to write fragmented response: type={}, id={}", + response.type, + response.request_id); + try (OutputStream out = new FragmentingOutputStream(ctx, fragmentSize)) { + JSONUtil.toJsonStream(response, out); + } + logger.debug( + "Completed writing fragmented response: type={}, id={}", + response.type, + response.request_id); + } + + private static class FragmentingOutputStream extends OutputStream { + private final ChannelHandlerContext ctx; + private final int fragmentSize; + private ByteBuf currentBuffer; + private boolean firstFrame = true; + private boolean closed = false; + + public FragmentingOutputStream(ChannelHandlerContext ctx, int fragmentSize) { + this.ctx = ctx; + this.fragmentSize = fragmentSize; + this.currentBuffer = ctx.alloc().buffer(fragmentSize); + logger.debug("Created FragmentingOutputStream with fragment size: {}", fragmentSize); + } + + @Override + public void write(int b) throws IOException { + currentBuffer.writeByte(b); + if (currentBuffer.readableBytes() == fragmentSize) { + flushBuffer(false); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + while (len > 0) { + int toCopy = Math.min(len, fragmentSize - currentBuffer.readableBytes()); + currentBuffer.writeBytes(b, off, toCopy); + off += toCopy; + len -= toCopy; + if (currentBuffer.readableBytes() == fragmentSize) { + flushBuffer(false); + } + } + } + + private void flushBuffer(boolean last) { + if (currentBuffer.readableBytes() == 0 && !last) { + return; + } + + int frameSize = currentBuffer.readableBytes(); + WebSocketFrame frame; + if (firstFrame) { + frame = new TextWebSocketFrame(last, 0, currentBuffer); + firstFrame = false; + } else { + frame = new ContinuationWebSocketFrame(last, 0, currentBuffer); + } + + try { + ctx.channel() + .writeAndFlush(frame) + .addListener( + future -> { + if (!future.isSuccess()) { + logger.error( + "Failed to send websocket frame: {} bytes", frameSize, future.cause()); + } + }); + } catch (Exception e) { + logger.error("Exception while sending websocket frame: {} bytes", frameSize, e); + throw e; + } + + if (!last) { + currentBuffer = ctx.alloc().buffer(fragmentSize); + } else { + currentBuffer = null; + } + } + + @Override + public void close() throws IOException { + if (!closed) { + flushBuffer(true); + closed = true; + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + logger.warn( + "Unexpected exception in websocket connection to conductor. Channel active: {}, writable: {}", + ctx.channel().isActive(), + ctx.channel().isWritable(), + cause); + resetWebSocket(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + logger.warn( + "Websocket channel became inactive. Shutdown: {}, reconnect pending: {}", + isShutdown.get(), + reconnectTimeout != null); + if (!isShutdown.get() && reconnectTimeout == null) { + logger.warn("Channel inactive: Connection to conductor lost. Reconnecting"); + resetWebSocket(); + } + } } public static class Builder { @@ -145,7 +373,6 @@ public static class Builder { private int pingPeriodMs = 20000; private int pingTimeoutMs = 15000; private int reconnectDelayMs = 1000; - private int connectTimeoutMs = 5000; public Builder(DBOSExecutor e, SystemDatabase s, String key) { systemDatabase = s; @@ -174,11 +401,6 @@ Builder reconnectDelayMs(int reconnectDelayMs) { return this; } - Builder connectTimeoutMs(int connectTimeoutMs) { - this.connectTimeoutMs = connectTimeoutMs; - return this; - } - public Conductor build() { return new Conductor(this); } @@ -191,7 +413,7 @@ public void close() { public void start() { logger.debug("start"); - dispatchLoop(); + connectWebSocket(); } public void stop() { @@ -209,12 +431,10 @@ public void stop() { scheduler.shutdownNow(); - if (channel != null && channel.isActive()) { - channel.writeAndFlush(new CloseWebSocketFrame()); + if (channel != null) { channel.close(); channel = null; } - if (group != null) { group.shutdownGracefully(); group = null; @@ -235,20 +455,14 @@ void setPingInterval(Channel channel) { return; } try { - // Check for null in case channel connects before channel variable is assigned - if (channel == null) { - logger.debug("channel null, NOT sending ping to conductor"); + if (channel == null || !channel.isActive()) { + logger.debug("channel not active, NOT sending ping to conductor"); return; } - if (!channel.isActive()) { - logger.debug("channel closed, NOT sending ping to conductor"); - return; - } - - logger.debug("Sending ping to conductor"); + logger.debug("Sending ping to conductor (timeout in {}ms)", pingTimeoutMs); channel - .writeAndFlush(new PingWebSocketFrame(Unpooled.buffer(0))) + .writeAndFlush(new PingWebSocketFrame()) .addListener( future -> { if (!future.isSuccess()) { @@ -261,7 +475,9 @@ void setPingInterval(Channel channel) { scheduler.schedule( () -> { if (!isShutdown.get()) { - logger.warn("pingTimeout: Connection to conductor lost. Reconnecting."); + logger.error( + "Ping timeout after {}ms - no pong received from conductor. Connection lost, reconnecting.", + pingTimeoutMs); resetWebSocket(); } }, @@ -277,6 +493,10 @@ void setPingInterval(Channel channel) { } void resetWebSocket() { + logger.info( + "Resetting websocket connection. Channel active: {}", + channel != null ? channel.isActive() : "null"); + if (pingInterval != null) { pingInterval.cancel(false); pingInterval = null; @@ -298,464 +518,437 @@ void resetWebSocket() { } if (isShutdown.get()) { + logger.debug("Not scheduling reconnection - conductor is shutting down"); return; } if (reconnectTimeout == null) { + logger.info("Scheduling websocket reconnection in {}ms", reconnectDelayMs); reconnectTimeout = scheduler.schedule( () -> { reconnectTimeout = null; - dispatchLoop(); + logger.info("Attempting websocket reconnection"); + connectWebSocket(); }, reconnectDelayMs, TimeUnit.MILLISECONDS); + } else { + logger.debug("Reconnection already scheduled"); } } - void dispatchLoop() { + void connectWebSocket() { if (channel != null) { - logger.warn("Conductor websocket already exists"); + logger.warn("Conductor channel already exists"); return; } if (isShutdown.get()) { - logger.debug("Not starting dispatch loop as conductor is shutting down"); + logger.debug("Not connecting web socket as conductor is shutting down"); return; } - // Log environment variables that might affect authentication - logger.debug("===== Environment Check ====="); - logger.debug("DBOS_DOMAIN: {}", System.getenv("DBOS_DOMAIN")); - logger.debug("DBOS__CLOUD: {}", System.getenv("DBOS__CLOUD")); - logger.debug("DBOS__APPID: {}", System.getenv("DBOS__APPID")); - logger.debug("DBOS__VMID: {}", System.getenv("DBOS__VMID")); - logger.debug("DBOS__APPVERSION: {}", System.getenv("DBOS__APPVERSION")); - - // Check for any DBOS or auth-related environment variables - System.getenv().entrySet().stream() - .filter( - e -> - e.getKey().toUpperCase().contains("DBOS") - || e.getKey().toUpperCase().contains("AUTH") - || e.getKey().toUpperCase().contains("TOKEN")) - .forEach( - e -> - logger.debug( - "Auth env: {}={}", - e.getKey(), - e.getKey().toLowerCase().contains("token") - || e.getKey().toLowerCase().contains("password") - ? "[REDACTED]" - : e.getValue())); - logger.debug("===== Environment Check Complete ====="); - try { logger.debug("Connecting to conductor at {}", url); + URI uri = new URI(url); + String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); + final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); + final int port; + if (uri.getPort() == -1) { + if ("ws".equalsIgnoreCase(scheme)) { + port = 80; + } else if ("wss".equalsIgnoreCase(scheme)) { + port = 443; + } else { + port = -1; + } + } else { + port = uri.getPort(); + } - // Log the URL being used - URI wsUri = URI.create(url); - logger.debug("WebSocket URI: {}", wsUri); - logger.debug( - "URI scheme: {}, host: {}, port: {}, path: {}", - wsUri.getScheme(), - wsUri.getHost(), - wsUri.getPort(), - wsUri.getPath()); + if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { + logger.error("Only WS(S) is supported."); + return; + } - // Create event loop group - group = new NioEventLoopGroup(); + final boolean ssl = "wss".equalsIgnoreCase(scheme); + final SslContext sslCtx; + if (ssl) { + sslCtx = + SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + } else { + sslCtx = null; + } - // Create SSL context for WSS - SslContext sslCtx = - SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); - - // Create headers that EXACTLY match the working HttpClient implementation - DefaultHttpHeaders headers = new DefaultHttpHeaders(); - headers.add("User-Agent", "DBOS-Java-Debug/" + System.getProperty("java.version")); - headers.add("X-Debug-Test", "test-value"); - - logger.debug("WebSocket builder timeout: {}ms", connectTimeoutMs); - logger.debug("Adding debug headers to WebSocket request"); - - // Set up the WebSocket handshaker with the exact same parameters - int port = - wsUri.getPort() == -1 ? (wsUri.getScheme().equals("wss") ? 443 : 80) : wsUri.getPort(); - WebSocketClientHandshaker handshaker = - WebSocketClientHandshakerFactory.newHandshaker( - wsUri, - WebSocketVersion.V13, - null, // No subprotocol - true, // Allow extensions - headers, - 65536); // Max frame payload length - - // Bootstrap the client - Bootstrap bootstrap = new Bootstrap(); - final Conductor conductor = this; - - ChannelFuture future = - bootstrap - .group(group) - .channel(NioSocketChannel.class) - .handler( - new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc(), wsUri.getHost(), port)); - pipeline.addLast("http-codec", new HttpClientCodec()); - pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); - - // Add HTTP request/response logger + Origin header remover - pipeline.addLast("http-logger", new io.netty.channel.ChannelDuplexHandler() { + group = new NioEventLoopGroup(); + handler = new NettyWebSocketHandler(); + + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + var p = ch.pipeline(); + if (sslCtx != null) { + p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); + } + p.addLast( + new HttpClientCodec(), + new HttpObjectAggregator(256 * 1024 * 1024), // 256MB max message size + // Remove Origin header that Netty automatically adds to fix 403 errors + new io.netty.channel.ChannelDuplexHandler() { @Override public void write(ChannelHandlerContext ctx, Object msg, io.netty.channel.ChannelPromise promise) throws Exception { if (msg instanceof io.netty.handler.codec.http.HttpRequest) { io.netty.handler.codec.http.HttpRequest request = (io.netty.handler.codec.http.HttpRequest) msg; - - // Remove the Origin header that Netty automatically adds + // Remove the Origin header that causes 403 errors request.headers().remove("origin"); - request.headers().remove("Origin"); // try both cases - - logger.error("===== HTTP Request Debug ====="); - logger.error("HTTP Request: {} {} {}", - request.method(), request.uri(), request.protocolVersion()); - logger.error("HTTP Request Headers:"); - request.headers().forEach(header -> - logger.error(" {}: {}", header.getKey(), header.getValue())); - logger.error("===== End HTTP Request Debug ====="); + request.headers().remove("Origin"); } super.write(ctx, msg, promise); } - }); - - pipeline.addLast( - "websocket-handler", - new SimpleChannelInboundHandler() { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - handshaker.handshake(ctx.channel()); - super.channelActive(ctx); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) - throws Exception { - Channel ch = ctx.channel(); - - if (!handshaker.isHandshakeComplete()) { - try { - // Log the HTTP response we received - io.netty.handler.codec.http.FullHttpResponse response = - (io.netty.handler.codec.http.FullHttpResponse) msg; - logger.error("===== HTTP Response Debug ====="); - logger.error("HTTP Response Status: {} {}", - response.status().code(), response.status().reasonPhrase()); - logger.error("HTTP Response Headers:"); - response.headers().forEach(header -> - logger.error(" {}: {}", header.getKey(), header.getValue())); - logger.error("===== End HTTP Response Debug ====="); - - handshaker.finishHandshake(ch, response); - logger.info("===== SUCCESSFUL WebSocket Connection ====="); - logger.info("Connected to DBOS conductor at: {}", url); - logger.info("Channel state: {}", ch); - logger.debug("Channel class: {}", ch.getClass().getName()); - - setPingInterval(ch); - logger.info("===== WebSocket setup complete ====="); - return; - } catch (Exception e) { - logger.error("WebSocket handshake failed:", e); - return; - } - } - - if (msg instanceof TextWebSocketFrame) { - TextWebSocketFrame textFrame = (TextWebSocketFrame) msg; - String text = textFrame.text(); - - BaseMessage request; - try { - request = JSONUtil.fromJson(text, BaseMessage.class); - } catch (Exception e) { - logger.error("Conductor JSON Parsing error", e); - return; - } - - String responseText; - try { - BaseResponse response = getResponse(request); - responseText = JSONUtil.toJson(response); - } catch (Exception e) { - logger.error("Conductor Response error", e); - return; - } - - ch.writeAndFlush(new TextWebSocketFrame(responseText)); - } else if (msg instanceof PongWebSocketFrame) { - logger.debug("Received pong from conductor"); - if (pingTimeout != null) { - pingTimeout.cancel(false); - pingTimeout = null; - } - } else if (msg instanceof CloseWebSocketFrame) { - if (isShutdown.get()) { - logger.debug("Shutdown Conductor connection"); - } else if (reconnectTimeout == null) { - logger.warn( - "onClose: Connection to conductor lost. Reconnecting"); - resetWebSocket(); - } - ch.close(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - logger.error("===== WebSocket Connection Error ====="); - logger.error("Error type: {}", cause.getClass().getName()); - logger.error("Error message: {}", cause.getMessage()); - logger.error("URL: {}", url); - logger.error("Channel state: {}", ctx.channel()); - logger.error("Full error:", cause); - logger.error("===== End WebSocket Error ====="); - resetWebSocket(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) - throws Exception { - if (isShutdown.get()) { - logger.debug("Shutdown Conductor connection"); - } else if (reconnectTimeout == null) { - logger.warn( - "channelInactive: Connection to conductor lost. Reconnecting"); - resetWebSocket(); - } - super.channelInactive(ctx); - } - }); - } - }) - .connect(wsUri.getHost(), port); - - channel = future.sync().channel(); - - logger.info("===== WebSocket buildAsync completed ====="); - logger.info("WebSocket created successfully: {}", channel != null); - if (channel != null) { - logger.info("WebSocket class: {}", channel.getClass().getName()); - logger.info("WebSocket toString: {}", channel); - } - logger.info("===== WebSocket creation complete ====="); + }, + new WebSocketClientProtocolHandler( + WebSocketClientProtocolConfig.newBuilder() + .webSocketUri(uri) + .version(WebSocketVersion.V13) + .subprotocol(null) + .allowExtensions(false) + .customHeaders(EmptyHttpHeaders.INSTANCE) + .dropPongFrames(false) + .handleCloseFrames(false) + .maxFramePayloadLength(256 * 1024 * 1024) + .build()), + new MessageToMessageDecoder() { + @Override + protected void decode( + ChannelHandlerContext ctx, WebSocketFrame frame, List out) { + if (frame instanceof TextWebSocketFrame + || frame instanceof ContinuationWebSocketFrame) { + out.add(frame.content().retain()); + } else { + out.add(frame.retain()); + } + } + }, + new JsonObjectDecoder(256 * 1024 * 1024) { + { + setCumulator(COMPOSITE_CUMULATOR); + } + }, + handler); + } + }); + + ChannelFuture future = b.connect(host, port); + channel = future.channel(); + future.addListener( + f -> { + if (f.isSuccess()) { + logger.info("Successfully connected to conductor at {}:{}", host, port); + } else { + logger.warn( + "Failed to connect to conductor at {}:{}. Reconnecting", host, port, f.cause()); + resetWebSocket(); + } + }); } catch (Exception e) { - logger.error("===== Error in conductor loop ====="); - logger.error("Exception type: {}", e.getClass().getName()); - logger.error("Exception message: {}", e.getMessage()); - logger.error("Full exception:", e); - logger.error("===== End conductor loop error ====="); + logger.warn("Error in conductor loop. Reconnecting", e); resetWebSocket(); } } - BaseResponse getResponse(BaseMessage message) { - logger.debug("getResponse {}", message.type); + CompletableFuture getResponseAsync(BaseMessage message) { + logger.debug("getResponseAsync {}", message.type); MessageType messageType = MessageType.fromValue(message.type); - BiFunction func = dispatchMap.get(messageType); + BiFunction> func = + dispatchMap.get(messageType); if (func != null) { return func.apply(this, message); } else { logger.warn("Conductor unknown message type {}", message.type); - return new BaseResponse(message.type, message.request_id, "Unknown message type"); + return CompletableFuture.completedFuture( + new BaseResponse(message.type, message.request_id, "Unknown message type")); } } - static BaseResponse handleExecutorInfo(Conductor conductor, BaseMessage message) { - try { - String hostname = InetAddress.getLocalHost().getHostName(); - return new ExecutorInfoResponse( - message, - conductor.dbosExecutor.executorId(), - conductor.dbosExecutor.appVersion(), - hostname); - } catch (Exception e) { - return new ExecutorInfoResponse(message, e); - } + static CompletableFuture handleExecutorInfo( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + return new ExecutorInfoResponse( + message, + conductor.dbosExecutor.executorId(), + conductor.dbosExecutor.appVersion(), + hostname); + } catch (Exception e) { + return new ExecutorInfoResponse(message, e); + } + }); } - static BaseResponse handleRecovery(Conductor conductor, BaseMessage message) { - RecoveryRequest request = (RecoveryRequest) message; - try { - conductor.dbosExecutor.recoverPendingWorkflows(request.executor_ids); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when recovering pending workflows", e); - return new SuccessResponse(request, e); - } + static CompletableFuture handleRecovery(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + RecoveryRequest request = (RecoveryRequest) message; + try { + conductor.dbosExecutor.recoverPendingWorkflows(request.executor_ids); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when recovering pending workflows", e); + return new SuccessResponse(request, e); + } + }); } - static BaseResponse handleCancel(Conductor conductor, BaseMessage message) { - CancelRequest request = (CancelRequest) message; - try { - conductor.dbosExecutor.cancelWorkflow(request.workflow_id); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when cancelling workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } + static CompletableFuture handleCancel(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + CancelRequest request = (CancelRequest) message; + try { + conductor.dbosExecutor.cancelWorkflow(request.workflow_id); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error( + "Exception encountered when cancelling workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } + }); } - static BaseResponse handleResume(Conductor conductor, BaseMessage message) { - ResumeRequest request = (ResumeRequest) message; - try { - conductor.dbosExecutor.resumeWorkflow(request.workflow_id); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when resuming workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } + static CompletableFuture handleDelete(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + DeleteRequest request = (DeleteRequest) message; + try { + conductor.dbosExecutor.deleteWorkflow(request.workflow_id, request.delete_children); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when deleting workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } + }); } - static BaseResponse handleRestart(Conductor conductor, BaseMessage message) { - RestartRequest request = (RestartRequest) message; - try { - ForkOptions options = new ForkOptions(); - conductor.dbosExecutor.forkWorkflow(request.workflow_id, 0, options); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when restarting workflow {}", request.workflow_id, e); - return new SuccessResponse(request, e); - } + static CompletableFuture handleResume(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ResumeRequest request = (ResumeRequest) message; + try { + conductor.dbosExecutor.resumeWorkflow(request.workflow_id); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when resuming workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } + }); } - static BaseResponse handleFork(Conductor conductor, BaseMessage message) { - ForkWorkflowRequest request = (ForkWorkflowRequest) message; - if (request.body.workflow_id == null || request.body.start_step == null) { - return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request"); - } - try { - var options = - new ForkOptions(request.body.new_workflow_id, request.body.application_version, null); - WorkflowHandle handle = - conductor.dbosExecutor.forkWorkflow( - request.body.workflow_id, request.body.start_step, options); - return new ForkWorkflowResponse(request, handle.workflowId()); - } catch (Exception e) { - logger.error("Exception encountered when forking workflow {}", request, e); - return new ForkWorkflowResponse(request, e); - } + static CompletableFuture handleRestart(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + RestartRequest request = (RestartRequest) message; + try { + ForkOptions options = new ForkOptions(); + conductor.dbosExecutor.forkWorkflow(request.workflow_id, 0, options); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error( + "Exception encountered when restarting workflow {}", request.workflow_id, e); + return new SuccessResponse(request, e); + } + }); } - static BaseResponse handleListWorkflows(Conductor conductor, BaseMessage message) { - ListWorkflowsRequest request = (ListWorkflowsRequest) message; - try { - ListWorkflowsInput input = request.asInput(); - List statuses = conductor.dbosExecutor.listWorkflows(input); - List output = - statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); - return new WorkflowOutputsResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when listing workflows", e); - return new WorkflowOutputsResponse(request, e); - } + static CompletableFuture handleFork(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ForkWorkflowRequest request = (ForkWorkflowRequest) message; + if (request.body.workflow_id == null || request.body.start_step == null) { + return new ForkWorkflowResponse(request, null, "Invalid Fork Workflow Request"); + } + try { + var options = + new ForkOptions( + request.body.new_workflow_id, request.body.application_version, null); + WorkflowHandle handle = + conductor.dbosExecutor.forkWorkflow( + request.body.workflow_id, request.body.start_step, options); + return new ForkWorkflowResponse(request, handle.workflowId()); + } catch (Exception e) { + logger.error("Exception encountered when forking workflow {}", request, e); + return new ForkWorkflowResponse(request, e); + } + }); } - static BaseResponse handleListQueuedWorkflows(Conductor conductor, BaseMessage message) { - ListQueuedWorkflowsRequest request = (ListQueuedWorkflowsRequest) message; - try { - ListWorkflowsInput input = request.asInput(); - List statuses = conductor.dbosExecutor.listWorkflows(input); - List output = - statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); - return new WorkflowOutputsResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when listing workflows", e); - return new WorkflowOutputsResponse(request, e); - } + static CompletableFuture handleListWorkflows( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ListWorkflowsRequest request = (ListWorkflowsRequest) message; + try { + ListWorkflowsInput input = request.asInput(); + List statuses = conductor.dbosExecutor.listWorkflows(input); + List output = + statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); + return new WorkflowOutputsResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when listing workflows", e); + return new WorkflowOutputsResponse(request, e); + } + }); } - static BaseResponse handleListSteps(Conductor conductor, BaseMessage message) { - ListStepsRequest request = (ListStepsRequest) message; - try { - List stepInfoList = conductor.dbosExecutor.listWorkflowSteps(request.workflow_id); - List steps = - stepInfoList.stream() - .map(i -> new ListStepsResponse.Step(i)) - .collect(Collectors.toList()); - return new ListStepsResponse(request, steps); - } catch (Exception e) { - logger.error("Exception encountered when listing steps {}", request.workflow_id, e); - return new ListStepsResponse(request, e); - } + static CompletableFuture handleListQueuedWorkflows( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ListQueuedWorkflowsRequest request = (ListQueuedWorkflowsRequest) message; + try { + ListWorkflowsInput input = request.asInput(); + List statuses = conductor.dbosExecutor.listWorkflows(input); + List output = + statuses.stream().map(s -> new WorkflowsOutput(s)).collect(Collectors.toList()); + return new WorkflowOutputsResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when listing workflows", e); + return new WorkflowOutputsResponse(request, e); + } + }); } - static BaseResponse handleExistPendingWorkflows(Conductor conductor, BaseMessage message) { - ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; - try { - List pending = - conductor.systemDatabase.getPendingWorkflows( - request.executor_id, request.application_version); - return new ExistPendingWorkflowsResponse(request, pending.size() > 0); - } catch (Exception e) { - logger.error("Exception encountered when checking for pending workflows", e); - return new ExistPendingWorkflowsResponse(request, e); - } + static CompletableFuture handleListSteps(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ListStepsRequest request = (ListStepsRequest) message; + try { + List stepInfoList = + conductor.dbosExecutor.listWorkflowSteps(request.workflow_id); + List steps = + stepInfoList.stream() + .map(i -> new ListStepsResponse.Step(i)) + .collect(Collectors.toList()); + return new ListStepsResponse(request, steps); + } catch (Exception e) { + logger.error("Exception encountered when listing steps {}", request.workflow_id, e); + return new ListStepsResponse(request, e); + } + }); } - static BaseResponse handleGetWorkflow(Conductor conductor, BaseMessage message) { - GetWorkflowRequest request = (GetWorkflowRequest) message; - try { - var status = conductor.systemDatabase.getWorkflowStatus(request.workflow_id); - WorkflowsOutput output = status == null ? null : new WorkflowsOutput(status); - return new GetWorkflowResponse(request, output); - } catch (Exception e) { - logger.error("Exception encountered when getting workflow {}", request.workflow_id, e); - return new GetWorkflowResponse(request, e); - } + static CompletableFuture handleExistPendingWorkflows( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ExistPendingWorkflowsRequest request = (ExistPendingWorkflowsRequest) message; + try { + List pending = + conductor.systemDatabase.getPendingWorkflows( + request.executor_id, request.application_version); + return new ExistPendingWorkflowsResponse(request, pending.size() > 0); + } catch (Exception e) { + logger.error("Exception encountered when checking for pending workflows", e); + return new ExistPendingWorkflowsResponse(request, e); + } + }); } - static BaseResponse handleRetention(Conductor conductor, BaseMessage message) { - RetentionRequest request = (RetentionRequest) message; + static CompletableFuture handleGetWorkflow( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + GetWorkflowRequest request = (GetWorkflowRequest) message; + try { + var status = conductor.systemDatabase.getWorkflowStatus(request.workflow_id); + WorkflowsOutput output = status == null ? null : new WorkflowsOutput(status); + return new GetWorkflowResponse(request, output); + } catch (Exception e) { + logger.error("Exception encountered when getting workflow {}", request.workflow_id, e); + return new GetWorkflowResponse(request, e); + } + }); + } - try { - conductor.systemDatabase.garbageCollect( - request.body.gc_cutoff_epoch_ms, request.body.gc_rows_threshold); - } catch (Exception e) { - logger.error("Exception encountered garbage collecting system database", e); - return new SuccessResponse(request, e); - } + static CompletableFuture handleRetention(Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + RetentionRequest request = (RetentionRequest) message; - try { - if (request.body.timeout_cutoff_epoch_ms != null) { - conductor.dbosExecutor.globalTimeout(request.body.timeout_cutoff_epoch_ms); - } - } catch (Exception e) { - logger.error("Exception encountered setting global timeout", e); - return new SuccessResponse(request, e); - } + try { + conductor.systemDatabase.garbageCollect( + request.body.gc_cutoff_epoch_ms, request.body.gc_rows_threshold); + } catch (Exception e) { + logger.error("Exception encountered garbage collecting system database", e); + return new SuccessResponse(request, e); + } - return new SuccessResponse(request, true); + try { + if (request.body.timeout_cutoff_epoch_ms != null) { + conductor.dbosExecutor.globalTimeout(request.body.timeout_cutoff_epoch_ms); + } + } catch (Exception e) { + logger.error("Exception encountered setting global timeout", e); + return new SuccessResponse(request, e); + } + + return new SuccessResponse(request, true); + }); } - static BaseResponse handleGetMetrics(Conductor conductor, BaseMessage message) { - GetMetricsRequest request = (GetMetricsRequest) message; + static CompletableFuture handleGetMetrics( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + GetMetricsRequest request = (GetMetricsRequest) message; - try { - if (request.metric_class.equals("workflow_step_count")) { - var metrics = conductor.systemDatabase.getMetrics(request.startTime(), request.endTime()); - return new GetMetricsResponse(request, metrics); - } else { - logger.warn("Unexpected metric class {}", request.metric_class); - throw new RuntimeException("Unexpected metric class %s".formatted(request.metric_class)); - } - } catch (Exception e) { - return new GetMetricsResponse(request, e); - } + try { + if (request.metric_class.equals("workflow_step_count")) { + var metrics = + conductor.systemDatabase.getMetrics(request.startTime(), request.endTime()); + return new GetMetricsResponse(request, metrics); + } else { + logger.warn("Unexpected metric class {}", request.metric_class); + throw new RuntimeException( + "Unexpected metric class %s".formatted(request.metric_class)); + } + } catch (Exception e) { + return new GetMetricsResponse(request, e); + } + }); + } + + static CompletableFuture handleImportWorkflow( + Conductor conductor, BaseMessage message) { + return CompletableFuture.supplyAsync( + () -> { + ImportWorkflowRequest request = (ImportWorkflowRequest) message; + long startTime = System.currentTimeMillis(); + logger.info("Starting import workflow"); + + try { + var exportedWorkflows = deserializeExportedWorkflows(request.serialized_workflow); + logger.info("deserialization completed workflow count={}", exportedWorkflows.size()); + conductor.systemDatabase.importWorkflow(exportedWorkflows); + long duration = System.currentTimeMillis() - startTime; + logger.info( + "Database import completed: {} workflows imported, duration={}ms", + exportedWorkflows.size(), + duration); + return new SuccessResponse(request, true); + } catch (Exception e) { + logger.error("Exception encountered when importing workflow", e); + return new SuccessResponse(request, e); + } + }); } static CompletableFuture handleExportWorkflow( @@ -764,11 +957,20 @@ static CompletableFuture handleExportWorkflow( () -> { ExportWorkflowRequest request = (ExportWorkflowRequest) message; long startTime = System.currentTimeMillis(); + logger.info( + "Starting export workflow: id={}, export_children={}", + request.workflow_id, + request.export_children); + try { var workflows = conductor.systemDatabase.exportWorkflow( request.workflow_id, request.export_children); - logger.info("Queried database workflow count={}", workflows.size()); + + logger.info( + "Database export completed: workflow_id={}, {} workflows retrieved", + request.workflow_id, + workflows.size()); var serializedWorkflow = serializeExportedWorkflows(workflows); @@ -791,31 +993,12 @@ static CompletableFuture handleExportWorkflow( duration, e); return new ExportWorkflowResponse(request, e); - } - }); - } - - static CompletableFuture handleImportWorkflow( - Conductor conductor, BaseMessage message) { - return CompletableFuture.supplyAsync( - () -> { - ImportWorkflowRequest request = (ImportWorkflowRequest) message; - long startTime = System.currentTimeMillis(); - try { - logger.info("Starting workflow import"); - var exportedWorkflows = deserializeExportedWorkflows(request.serialized_workflow); - logger.info("deserialization completed workflow count={}", exportedWorkflows.size()); - conductor.systemDatabase.importWorkflow(exportedWorkflows); - - long duration = System.currentTimeMillis() - startTime; + } finally { + long totalDuration = System.currentTimeMillis() - startTime; logger.info( - "Database import completed: {} workflows imported, duration={}ms", - exportedWorkflows.size(), - duration); - return new SuccessResponse(request, true); - } catch (Exception e) { - logger.error("Exception encountered when importing workflow", e); - return new SuccessResponse(request, e); + "handleExportWorkflow completed: id={}, total_duration={}ms", + request.workflow_id, + totalDuration); } }); } From dd87a5d3621c5a370200dbd505d74eea1a8f2281 Mon Sep 17 00:00:00 2001 From: Harry Pierson Date: Wed, 11 Feb 2026 15:02:44 -0800 Subject: [PATCH 12/12] generateOriginHeader(false) --- .../dev/dbos/transact/conductor/Conductor.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java index ac167a02..8b4f76f1 100644 --- a/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java +++ b/transact/src/main/java/dev/dbos/transact/conductor/Conductor.java @@ -600,19 +600,6 @@ protected void initChannel(SocketChannel ch) { p.addLast( new HttpClientCodec(), new HttpObjectAggregator(256 * 1024 * 1024), // 256MB max message size - // Remove Origin header that Netty automatically adds to fix 403 errors - new io.netty.channel.ChannelDuplexHandler() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, io.netty.channel.ChannelPromise promise) throws Exception { - if (msg instanceof io.netty.handler.codec.http.HttpRequest) { - io.netty.handler.codec.http.HttpRequest request = (io.netty.handler.codec.http.HttpRequest) msg; - // Remove the Origin header that causes 403 errors - request.headers().remove("origin"); - request.headers().remove("Origin"); - } - super.write(ctx, msg, promise); - } - }, new WebSocketClientProtocolHandler( WebSocketClientProtocolConfig.newBuilder() .webSocketUri(uri) @@ -622,6 +609,7 @@ public void write(ChannelHandlerContext ctx, Object msg, io.netty.channel.Channe .customHeaders(EmptyHttpHeaders.INSTANCE) .dropPongFrames(false) .handleCloseFrames(false) + .generateOriginHeader(false) .maxFramePayloadLength(256 * 1024 * 1024) .build()), new MessageToMessageDecoder() {