Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agentscope-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,11 @@
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
Expand All @@ -47,12 +48,21 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* A built-in, out-of-the-box JSONL trace exporter based on the Hook event system.
Expand All @@ -65,13 +75,15 @@
* <ul>
* <li>This exporter is best-effort by default: serialization / IO errors do not break agent
* execution unless {@link Builder#failFast(boolean)} is enabled.</li>
* <li>This exporter performs blocking file IO, but it runs on Reactor boundedElastic to avoid
* blocking agent execution threads.</li>
* <li>This exporter performs blocking file IO on an internal single-threaded queue to keep file
* order, step IDs, and run IDs consistent.</li>
* </ul>
*/
public final class JsonlTraceExporter implements Hook, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(JsonlTraceExporter.class);
private static final long CLOSE_TIMEOUT_SECONDS = 30L;
private static final OpenTelemetryAccess OPEN_TELEMETRY_ACCESS = OpenTelemetryAccess.create();

private final Path outputFile;
private final boolean flushEveryLine;
Expand All @@ -81,8 +93,13 @@ public final class JsonlTraceExporter implements Hook, AutoCloseable {

private final Object lock = new Object();
private final BufferedWriter writer;
private final ExecutorService exportExecutor;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final Map<String, RunState> runStates = new ConcurrentHashMap<>();
// WeakHashMap keeps per-agent run state from accumulating indefinitely after agent instances
// become unreachable. Concurrency safety does not rely on WeakHashMap itself: all access to
// this map is serialized through the exporter’s single-threaded queue.
private final Map<String, RunState> runStates = new WeakHashMap<>();

private JsonlTraceExporter(
Path outputFile,
Expand All @@ -97,6 +114,7 @@ private JsonlTraceExporter(
this.priority = priority;
this.eventFilter = Objects.requireNonNull(eventFilter, "eventFilter cannot be null");
this.writer = openWriter(outputFile, append);
this.exportExecutor = createExportExecutor();
}

public static Builder builder(Path outputFile) {
Expand All @@ -110,27 +128,50 @@ public int priority() {

@Override
public <T extends HookEvent> Mono<T> onEvent(T event) {
if (event == null || !eventFilter.test(event)) {
return Mono.just(event);
T nonNullEvent = Objects.requireNonNull(event, "event cannot be null");
if (!eventFilter.test(nonNullEvent)) {
return Mono.just(nonNullEvent);
}

return Mono.fromCallable(
() -> {
writeEvent(event);
return event;
})
.subscribeOn(Schedulers.boundedElastic())
return Mono.defer(() -> enqueueWrite(nonNullEvent, OPEN_TELEMETRY_ACCESS.captureCurrent()))
.onErrorResume(
e -> {
error -> {
if (failFast) {
return Mono.error(e);
return Mono.error(error);
}
log.warn("Failed to export hook event to JSONL: {}", e.getMessage(), e);
return Mono.just(event);
log.warn(
"Failed to export hook event to JSONL: {}",
error.getMessage(),
error);
return Mono.just(nonNullEvent);
});
}

private void writeEvent(HookEvent event) throws IOException {
private <T extends HookEvent> Mono<T> enqueueWrite(T event, OpenTelemetryIds openTelemetryIds) {
if (closed.get()) {
return Mono.error(
new RejectedExecutionException(
"JSONL exporter is closed: " + outputFile.toAbsolutePath()));
}

CompletableFuture<T> future = new CompletableFuture<>();
try {
exportExecutor.execute(
() -> {
try {
writeEvent(event, openTelemetryIds);
future.complete(event);
} catch (Throwable error) {
future.completeExceptionally(error);
}
});
} catch (RejectedExecutionException error) {
future.completeExceptionally(error);
}
return Mono.fromFuture(future);
}

private void writeEvent(HookEvent event, OpenTelemetryIds openTelemetryIds) throws IOException {
RunState runState = getOrUpdateRunState(event);

Map<String, Object> record = new LinkedHashMap<>();
Expand All @@ -143,14 +184,15 @@ private void writeEvent(HookEvent event) throws IOException {
record.put("turn_id", runState.turnId);
record.put("step_id", runState.stepId);

putOpenTelemetryIdsIfPresent(record);
if (openTelemetryIds != null) {
openTelemetryIds.putIfPresent(record);
}

if (event instanceof ReasoningEvent reasoningEvent) {
record.put("model_name", reasoningEvent.getModelName());
record.put("generate_options", reasoningEvent.getGenerateOptions());
}

// Payload
if (event instanceof PreCallEvent e) {
record.put("input_messages", e.getInputMessages());
} else if (event instanceof PostCallEvent e) {
Expand All @@ -172,6 +214,7 @@ private void writeEvent(HookEvent event) throws IOException {
record.put("tool_use", e.getToolUse());
} else if (event instanceof ActingChunkEvent e) {
record.put("tool_use", e.getToolUse());
record.put("incremental_chunk", e.getChunk());
record.put("chunk", e.getChunk());
} else if (event instanceof PostActingEvent e) {
record.put("tool_use", e.getToolUse());
Expand Down Expand Up @@ -251,41 +294,50 @@ private static String stackTraceToString(Throwable error) {
return sw.toString();
}

private static void putOpenTelemetryIdsIfPresent(Map<String, Object> record) {
// Optional integration: if OpenTelemetry is on the classpath, try to attach trace/span id.
// This keeps core module free of hard dependencies on OpenTelemetry.
@Override
public void close() throws IOException {
if (!closed.compareAndSet(false, true)) {
return;
}

boolean drained = false;
try {
Class<?> spanClass = Class.forName("io.opentelemetry.api.trace.Span");
Object span = spanClass.getMethod("current").invoke(null);
if (span == null) {
return;
}
Object spanContext = spanClass.getMethod("getSpanContext").invoke(span);
if (spanContext == null) {
return;
Future<?> barrier = exportExecutor.submit(() -> {});
barrier.get(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
drained = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted while waiting for JSONL exporter to finish pending writes", e);
} catch (TimeoutException e) {
throw new IOException(
"Timed out while waiting for JSONL exporter to finish pending writes", e);
} catch (ExecutionException e) {
throw new IOException(
"Failed while waiting for JSONL exporter to finish pending writes",
e.getCause());
} finally {
if (drained) {
exportExecutor.shutdown();
} else {
exportExecutor.shutdownNow();
}

Class<?> spanContextClass = Class.forName("io.opentelemetry.api.trace.SpanContext");
boolean valid = (boolean) spanContextClass.getMethod("isValid").invoke(spanContext);
if (!valid) {
return;
runStates.clear();
synchronized (lock) {
writer.flush();
writer.close();
}

String traceId = (String) spanContextClass.getMethod("getTraceId").invoke(spanContext);
String spanId = (String) spanContextClass.getMethod("getSpanId").invoke(spanContext);
record.put("trace_id", traceId);
record.put("span_id", spanId);
} catch (Throwable ignored) {
// Ignore all reflection failures.
}
}

@Override
public void close() throws IOException {
synchronized (lock) {
writer.flush();
writer.close();
}
private static ExecutorService createExportExecutor() {
ThreadFactory threadFactory =
runnable -> {
Thread thread = new Thread(runnable, "agentscope-jsonl-trace-exporter");
thread.setDaemon(true);
return thread;
};
return Executors.newSingleThreadExecutor(threadFactory);
}

private static final class RunState {
Expand All @@ -294,6 +346,93 @@ private static final class RunState {
private long stepId = 0;
}

private static final class OpenTelemetryAccess {
private final Method currentMethod;
private final Method getSpanContextMethod;
private final Method isValidMethod;
private final Method getTraceIdMethod;
private final Method getSpanIdMethod;

private OpenTelemetryAccess(
Method currentMethod,
Method getSpanContextMethod,
Method isValidMethod,
Method getTraceIdMethod,
Method getSpanIdMethod) {
this.currentMethod = currentMethod;
this.getSpanContextMethod = getSpanContextMethod;
this.isValidMethod = isValidMethod;
this.getTraceIdMethod = getTraceIdMethod;
this.getSpanIdMethod = getSpanIdMethod;
}

private static OpenTelemetryAccess create() {
try {
ClassLoader classLoader = JsonlTraceExporter.class.getClassLoader();
Class<?> spanClass =
Class.forName("io.opentelemetry.api.trace.Span", false, classLoader);
Class<?> spanContextClass =
Class.forName("io.opentelemetry.api.trace.SpanContext", false, classLoader);
return new OpenTelemetryAccess(
spanClass.getMethod("current"),
spanClass.getMethod("getSpanContext"),
spanContextClass.getMethod("isValid"),
spanContextClass.getMethod("getTraceId"),
spanContextClass.getMethod("getSpanId"));
} catch (Throwable ignored) {
return new OpenTelemetryAccess(null, null, null, null, null);
}
}

private void putIfPresent(Map<String, Object> record) {
if (currentMethod == null) {
return;
}
OpenTelemetryIds openTelemetryIds = captureCurrent();
if (openTelemetryIds != null) {
openTelemetryIds.putIfPresent(record);
}
}

private OpenTelemetryIds captureCurrent() {
if (currentMethod == null) {
return null;
}
try {
Object span = currentMethod.invoke(null);
if (span == null) {
return null;
}
Object spanContext = getSpanContextMethod.invoke(span);
if (spanContext == null || !(boolean) isValidMethod.invoke(spanContext)) {
return null;
}
return new OpenTelemetryIds(
(String) getTraceIdMethod.invoke(spanContext),
(String) getSpanIdMethod.invoke(spanContext));
} catch (Throwable ignored) {
return null;
}
}
}

private static final class OpenTelemetryIds {
private final String traceId;
private final String spanId;

private OpenTelemetryIds(String traceId, String spanId) {
this.traceId = traceId;
this.spanId = spanId;
}

private void putIfPresent(Map<String, Object> record) {
if (traceId != null && spanId != null) {
record.put("trace_id", traceId);
record.put("span_id", spanId);
}
}
}

public static final class Builder {
private final Path outputFile;

Expand Down
Loading
Loading