diff --git a/agentscope-core/pom.xml b/agentscope-core/pom.xml index 54bf500a7..b5bb9a12b 100644 --- a/agentscope-core/pom.xml +++ b/agentscope-core/pom.xml @@ -140,5 +140,11 @@ com.networknt json-schema-validator + + + io.opentelemetry + opentelemetry-api + test + diff --git a/agentscope-core/src/main/java/io/agentscope/core/hook/recorder/JsonlTraceExporter.java b/agentscope-core/src/main/java/io/agentscope/core/hook/recorder/JsonlTraceExporter.java index 9054e5d50..9d60df268 100644 --- a/agentscope-core/src/main/java/io/agentscope/core/hook/recorder/JsonlTraceExporter.java +++ b/agentscope-core/src/main/java/io/agentscope/core/hook/recorder/JsonlTraceExporter.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.OpenOption; @@ -47,12 +48,21 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; +import java.util.WeakHashMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; /** * A built-in, out-of-the-box JSONL trace exporter based on the Hook event system. @@ -65,13 +75,15 @@ * */ public final class JsonlTraceExporter implements Hook, AutoCloseable { private static final Logger log = LoggerFactory.getLogger(JsonlTraceExporter.class); + private static final long CLOSE_TIMEOUT_SECONDS = 30L; + private static final OpenTelemetryAccess OPEN_TELEMETRY_ACCESS = OpenTelemetryAccess.create(); private final Path outputFile; private final boolean flushEveryLine; @@ -81,8 +93,13 @@ public final class JsonlTraceExporter implements Hook, AutoCloseable { private final Object lock = new Object(); private final BufferedWriter writer; + private final ExecutorService exportExecutor; + private final AtomicBoolean closed = new AtomicBoolean(false); - private final Map runStates = new ConcurrentHashMap<>(); + // WeakHashMap keeps per-agent run state from accumulating indefinitely after agent instances + // become unreachable. Concurrency safety does not rely on WeakHashMap itself: all access to + // this map is serialized through the exporter’s single-threaded queue. + private final Map runStates = new WeakHashMap<>(); private JsonlTraceExporter( Path outputFile, @@ -97,6 +114,7 @@ private JsonlTraceExporter( this.priority = priority; this.eventFilter = Objects.requireNonNull(eventFilter, "eventFilter cannot be null"); this.writer = openWriter(outputFile, append); + this.exportExecutor = createExportExecutor(); } public static Builder builder(Path outputFile) { @@ -110,27 +128,50 @@ public int priority() { @Override public Mono onEvent(T event) { - if (event == null || !eventFilter.test(event)) { - return Mono.just(event); + T nonNullEvent = Objects.requireNonNull(event, "event cannot be null"); + if (!eventFilter.test(nonNullEvent)) { + return Mono.just(nonNullEvent); } - return Mono.fromCallable( - () -> { - writeEvent(event); - return event; - }) - .subscribeOn(Schedulers.boundedElastic()) + return Mono.defer(() -> enqueueWrite(nonNullEvent, OPEN_TELEMETRY_ACCESS.captureCurrent())) .onErrorResume( - e -> { + error -> { if (failFast) { - return Mono.error(e); + return Mono.error(error); } - log.warn("Failed to export hook event to JSONL: {}", e.getMessage(), e); - return Mono.just(event); + log.warn( + "Failed to export hook event to JSONL: {}", + error.getMessage(), + error); + return Mono.just(nonNullEvent); }); } - private void writeEvent(HookEvent event) throws IOException { + private Mono enqueueWrite(T event, OpenTelemetryIds openTelemetryIds) { + if (closed.get()) { + return Mono.error( + new RejectedExecutionException( + "JSONL exporter is closed: " + outputFile.toAbsolutePath())); + } + + CompletableFuture future = new CompletableFuture<>(); + try { + exportExecutor.execute( + () -> { + try { + writeEvent(event, openTelemetryIds); + future.complete(event); + } catch (Throwable error) { + future.completeExceptionally(error); + } + }); + } catch (RejectedExecutionException error) { + future.completeExceptionally(error); + } + return Mono.fromFuture(future); + } + + private void writeEvent(HookEvent event, OpenTelemetryIds openTelemetryIds) throws IOException { RunState runState = getOrUpdateRunState(event); Map record = new LinkedHashMap<>(); @@ -143,14 +184,15 @@ private void writeEvent(HookEvent event) throws IOException { record.put("turn_id", runState.turnId); record.put("step_id", runState.stepId); - putOpenTelemetryIdsIfPresent(record); + if (openTelemetryIds != null) { + openTelemetryIds.putIfPresent(record); + } if (event instanceof ReasoningEvent reasoningEvent) { record.put("model_name", reasoningEvent.getModelName()); record.put("generate_options", reasoningEvent.getGenerateOptions()); } - // Payload if (event instanceof PreCallEvent e) { record.put("input_messages", e.getInputMessages()); } else if (event instanceof PostCallEvent e) { @@ -172,6 +214,7 @@ private void writeEvent(HookEvent event) throws IOException { record.put("tool_use", e.getToolUse()); } else if (event instanceof ActingChunkEvent e) { record.put("tool_use", e.getToolUse()); + record.put("incremental_chunk", e.getChunk()); record.put("chunk", e.getChunk()); } else if (event instanceof PostActingEvent e) { record.put("tool_use", e.getToolUse()); @@ -251,41 +294,50 @@ private static String stackTraceToString(Throwable error) { return sw.toString(); } - private static void putOpenTelemetryIdsIfPresent(Map record) { - // Optional integration: if OpenTelemetry is on the classpath, try to attach trace/span id. - // This keeps core module free of hard dependencies on OpenTelemetry. + @Override + public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + + boolean drained = false; try { - Class spanClass = Class.forName("io.opentelemetry.api.trace.Span"); - Object span = spanClass.getMethod("current").invoke(null); - if (span == null) { - return; - } - Object spanContext = spanClass.getMethod("getSpanContext").invoke(span); - if (spanContext == null) { - return; + Future barrier = exportExecutor.submit(() -> {}); + barrier.get(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + drained = true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted while waiting for JSONL exporter to finish pending writes", e); + } catch (TimeoutException e) { + throw new IOException( + "Timed out while waiting for JSONL exporter to finish pending writes", e); + } catch (ExecutionException e) { + throw new IOException( + "Failed while waiting for JSONL exporter to finish pending writes", + e.getCause()); + } finally { + if (drained) { + exportExecutor.shutdown(); + } else { + exportExecutor.shutdownNow(); } - - Class spanContextClass = Class.forName("io.opentelemetry.api.trace.SpanContext"); - boolean valid = (boolean) spanContextClass.getMethod("isValid").invoke(spanContext); - if (!valid) { - return; + runStates.clear(); + synchronized (lock) { + writer.flush(); + writer.close(); } - - String traceId = (String) spanContextClass.getMethod("getTraceId").invoke(spanContext); - String spanId = (String) spanContextClass.getMethod("getSpanId").invoke(spanContext); - record.put("trace_id", traceId); - record.put("span_id", spanId); - } catch (Throwable ignored) { - // Ignore all reflection failures. } } - @Override - public void close() throws IOException { - synchronized (lock) { - writer.flush(); - writer.close(); - } + private static ExecutorService createExportExecutor() { + ThreadFactory threadFactory = + runnable -> { + Thread thread = new Thread(runnable, "agentscope-jsonl-trace-exporter"); + thread.setDaemon(true); + return thread; + }; + return Executors.newSingleThreadExecutor(threadFactory); } private static final class RunState { @@ -294,6 +346,93 @@ private static final class RunState { private long stepId = 0; } + private static final class OpenTelemetryAccess { + private final Method currentMethod; + private final Method getSpanContextMethod; + private final Method isValidMethod; + private final Method getTraceIdMethod; + private final Method getSpanIdMethod; + + private OpenTelemetryAccess( + Method currentMethod, + Method getSpanContextMethod, + Method isValidMethod, + Method getTraceIdMethod, + Method getSpanIdMethod) { + this.currentMethod = currentMethod; + this.getSpanContextMethod = getSpanContextMethod; + this.isValidMethod = isValidMethod; + this.getTraceIdMethod = getTraceIdMethod; + this.getSpanIdMethod = getSpanIdMethod; + } + + private static OpenTelemetryAccess create() { + try { + ClassLoader classLoader = JsonlTraceExporter.class.getClassLoader(); + Class spanClass = + Class.forName("io.opentelemetry.api.trace.Span", false, classLoader); + Class spanContextClass = + Class.forName("io.opentelemetry.api.trace.SpanContext", false, classLoader); + return new OpenTelemetryAccess( + spanClass.getMethod("current"), + spanClass.getMethod("getSpanContext"), + spanContextClass.getMethod("isValid"), + spanContextClass.getMethod("getTraceId"), + spanContextClass.getMethod("getSpanId")); + } catch (Throwable ignored) { + return new OpenTelemetryAccess(null, null, null, null, null); + } + } + + private void putIfPresent(Map record) { + if (currentMethod == null) { + return; + } + OpenTelemetryIds openTelemetryIds = captureCurrent(); + if (openTelemetryIds != null) { + openTelemetryIds.putIfPresent(record); + } + } + + private OpenTelemetryIds captureCurrent() { + if (currentMethod == null) { + return null; + } + try { + Object span = currentMethod.invoke(null); + if (span == null) { + return null; + } + Object spanContext = getSpanContextMethod.invoke(span); + if (spanContext == null || !(boolean) isValidMethod.invoke(spanContext)) { + return null; + } + return new OpenTelemetryIds( + (String) getTraceIdMethod.invoke(spanContext), + (String) getSpanIdMethod.invoke(spanContext)); + } catch (Throwable ignored) { + return null; + } + } + } + + private static final class OpenTelemetryIds { + private final String traceId; + private final String spanId; + + private OpenTelemetryIds(String traceId, String spanId) { + this.traceId = traceId; + this.spanId = spanId; + } + + private void putIfPresent(Map record) { + if (traceId != null && spanId != null) { + record.put("trace_id", traceId); + record.put("span_id", spanId); + } + } + } + public static final class Builder { private final Path outputFile; diff --git a/agentscope-core/src/test/java/io/agentscope/core/hook/recorder/JsonlTraceExporterTest.java b/agentscope-core/src/test/java/io/agentscope/core/hook/recorder/JsonlTraceExporterTest.java index b70293740..68b805621 100644 --- a/agentscope-core/src/test/java/io/agentscope/core/hook/recorder/JsonlTraceExporterTest.java +++ b/agentscope-core/src/test/java/io/agentscope/core/hook/recorder/JsonlTraceExporterTest.java @@ -47,12 +47,24 @@ import io.agentscope.core.model.GenerateOptions; import io.agentscope.core.tool.Toolkit; import io.agentscope.core.util.JsonUtils; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import reactor.core.publisher.Flux; @@ -150,6 +162,13 @@ void writesJsonlLinesForEnabledEvents() throws Exception { assertTrue(containsKeyValue(records, "event_type", "POST_SUMMARY")); assertTrue(containsKeyValue(records, "event_type", "ERROR")); assertTrue(containsKeyValue(records, "event_type", "POST_CALL")); + + Map actingChunkRecord = findByEventType(records, "ACTING_CHUNK"); + assertNotNull(actingChunkRecord.get("incremental_chunk")); + assertNotNull(actingChunkRecord.get("chunk")); + assertEquals( + JsonUtils.getJsonCodec().toJson(actingChunkRecord.get("incremental_chunk")), + JsonUtils.getJsonCodec().toJson(actingChunkRecord.get("chunk"))); } @Test @@ -190,6 +209,8 @@ void runIdChangesAcrossTurns() throws Exception { assertNotNull(run1); assertNotNull(run2); assertNotEquals(run1, run2); + assertEquals(1, ((Number) records.get(0).get("turn_id")).intValue()); + assertEquals(2, ((Number) records.get(2).get("turn_id")).intValue()); } @Test @@ -223,22 +244,119 @@ void failFastControlsErrorPropagation() throws Exception { } @Test - void exportsOpenTelemetryIdsWhenAvailable() throws Exception { - io.opentelemetry.api.trace.Span.setCurrent( - new io.opentelemetry.api.trace.Span( - new io.opentelemetry.api.trace.SpanContext(true, "trace-abc", "span-xyz"))); + void rejectsNullEvents() throws Exception { + Path output = tempDir.resolve("null.jsonl"); + try (JsonlTraceExporter exporter = + JsonlTraceExporter.builder(output).append(false).flushEveryLine(true).build()) { + assertThrows(NullPointerException.class, () -> exporter.onEvent(null)); + } + } - Path output = tempDir.resolve("otel.jsonl"); + @Test + void serializesConcurrentExportsPerAgent() throws Exception { + Path output = tempDir.resolve("concurrent.jsonl"); TestAgent agent = new TestAgent("agent-1", "TestAgent"); + Msg assistantMsg = textMsg(MsgRole.ASSISTANT, "world"); + GenerateOptions options = GenerateOptions.builder().build(); + int eventCount = 200; + + ExecutorService executor = Executors.newFixedThreadPool(32); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(eventCount); + ConcurrentLinkedQueue failures = new ConcurrentLinkedQueue<>(); try (JsonlTraceExporter exporter = JsonlTraceExporter.builder(output).append(false).flushEveryLine(true).build()) { exporter.onEvent(new PreCallEvent(agent, List.of(textMsg(MsgRole.USER, "hi")))).block(); + + for (int i = 0; i < eventCount; i++) { + executor.submit( + () -> { + try { + assertTrue(start.await(30, TimeUnit.SECONDS)); + exporter.onEvent( + new PostReasoningEvent( + agent, "mock-model", options, assistantMsg)) + .block(); + } catch (Throwable t) { + failures.add(t); + } finally { + done.countDown(); + } + }); + } + + start.countDown(); + assertTrue(done.await(30, TimeUnit.SECONDS)); + assertTrue(failures.isEmpty(), () -> "Unexpected failures: " + failures); + } finally { + executor.shutdownNow(); + } + + List> records = readAll(output); + assertEquals(eventCount + 1, records.size()); + + Set stepIds = new HashSet<>(); + Set runIds = new HashSet<>(); + for (Map record : records) { + runIds.add((String) record.get("run_id")); + stepIds.add(((Number) record.get("step_id")).longValue()); + } + + assertEquals(1, runIds.size()); + assertEquals(eventCount + 1, stepIds.size()); + for (long expected = 0; expected <= eventCount; expected++) { + long expectedStepId = expected; + assertTrue(stepIds.contains(expectedStepId), () -> "Missing step_id " + expectedStepId); + } + } + + @Test + void closeWaitsForSubscribedWrites() throws Exception { + Path output = tempDir.resolve("close-drain.jsonl"); + int eventCount = 100; + List> subscriptions = new ArrayList<>(); + + JsonlTraceExporter exporter = + JsonlTraceExporter.builder(output).append(false).flushEveryLine(false).build(); + for (int i = 0; i < eventCount; i++) { + TestAgent agent = new TestAgent("agent-" + i, "Agent-" + i); + subscriptions.add( + exporter.onEvent( + new PreCallEvent(agent, List.of(textMsg(MsgRole.USER, "hi-" + i))))); + } + + subscriptions.forEach(Mono::subscribe); + exporter.close(); + + List> records = readAll(output); + assertEquals(eventCount, records.size()); + } + + @Test + void exportsOpenTelemetryIdsWhenAvailable() throws Exception { + Path output = tempDir.resolve("otel.jsonl"); + TestAgent agent = new TestAgent("agent-1", "TestAgent"); + + SpanContext spanContext = + SpanContext.create( + "0123456789abcdef0123456789abcdef", + "89abcdef01234567", + TraceFlags.getSampled(), + TraceState.getDefault()); + + try (Scope ignored = Span.wrap(spanContext).makeCurrent(); + JsonlTraceExporter exporter = + JsonlTraceExporter.builder(output) + .append(false) + .flushEveryLine(true) + .build()) { + exporter.onEvent(new PreCallEvent(agent, List.of(textMsg(MsgRole.USER, "hi")))).block(); } Map record = readAll(output).get(0); - assertEquals("trace-abc", record.get("trace_id")); - assertEquals("span-xyz", record.get("span_id")); + assertEquals("0123456789abcdef0123456789abcdef", record.get("trace_id")); + assertEquals("89abcdef01234567", record.get("span_id")); } private static Msg textMsg(MsgRole role, String text) { @@ -258,15 +376,23 @@ private static List> readAll(Path output) throws IOException private static boolean containsKeyValue( List> records, String key, String value) { - for (Map r : records) { - Object v = r.get(key); - if (value.equals(v)) { + for (Map record : records) { + Object actual = record.get(key); + if (value.equals(actual)) { return true; } } return false; } + private static Map findByEventType( + List> records, String eventType) { + return records.stream() + .filter(record -> eventType.equals(record.get("event_type"))) + .findFirst() + .orElseThrow(() -> new AssertionError("Missing event type " + eventType)); + } + private static final class TestAgent implements Agent { private final String agentId; private final String name; diff --git a/agentscope-core/src/test/java/io/opentelemetry/api/trace/Span.java b/agentscope-core/src/test/java/io/opentelemetry/api/trace/Span.java deleted file mode 100644 index 5ccc455d0..000000000 --- a/agentscope-core/src/test/java/io/opentelemetry/api/trace/Span.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2024-2026 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.opentelemetry.api.trace; - -/** - * Minimal OpenTelemetry stubs for tests. - * - *

AgentScope core does not depend on OpenTelemetry directly. JsonlTraceExporter uses reflection - * to attach trace_id/span_id when OpenTelemetry is present at runtime. These stubs let us cover - * that branch in unit tests without adding a core dependency. - */ -public final class Span { - - private static volatile Span current = new Span(new SpanContext(false, "", "")); - - private final SpanContext context; - - public Span(SpanContext context) { - this.context = context; - } - - public static Span current() { - return current; - } - - public static void setCurrent(Span span) { - current = span; - } - - public SpanContext getSpanContext() { - return context; - } -} diff --git a/agentscope-core/src/test/java/io/opentelemetry/api/trace/SpanContext.java b/agentscope-core/src/test/java/io/opentelemetry/api/trace/SpanContext.java deleted file mode 100644 index de1ee35ba..000000000 --- a/agentscope-core/src/test/java/io/opentelemetry/api/trace/SpanContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2024-2026 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.opentelemetry.api.trace; - -/** Minimal OpenTelemetry stub for tests. */ -public final class SpanContext { - private final boolean valid; - private final String traceId; - private final String spanId; - - public SpanContext(boolean valid, String traceId, String spanId) { - this.valid = valid; - this.traceId = traceId; - this.spanId = spanId; - } - - public boolean isValid() { - return valid; - } - - public String getTraceId() { - return traceId; - } - - public String getSpanId() { - return spanId; - } -} diff --git a/docs/en/task/hook.md b/docs/en/task/hook.md index ac533c972..7066b737a 100644 --- a/docs/en/task/hook.md +++ b/docs/en/task/hook.md @@ -134,6 +134,10 @@ Hooks are immutable after agent construction. For local debugging and offline troubleshooting, AgentScope Java provides a built-in JSONL exporter: +> Warning: the JSONL trace exporter writes full prompts, messages, tool inputs, and error stack +> traces to local files. These records may contain sensitive user data, credentials, or other +> secrets, so only enable it in trusted environments and handle the output file as sensitive data. + ```java import io.agentscope.core.ReActAgent; import io.agentscope.core.hook.recorder.JsonlTraceExporter; diff --git a/docs/zh/task/hook.md b/docs/zh/task/hook.md index f57be00c2..e394d5d57 100644 --- a/docs/zh/task/hook.md +++ b/docs/zh/task/hook.md @@ -207,9 +207,9 @@ cd agentscope-examples/quickstart mvn exec:java -Dexec.mainClass="io.agentscope.examples.quickstart.HookExample" ``` -## Built-in JSONL Trace Exporter +## 内置 JSONL 跟踪导出器 -AgentScope Java provides a built-in JSONL exporter for local debugging: +AgentScope Java 内置了一个 JSONL 导出器,可用于本地调试和离线排障: ```java import io.agentscope.core.hook.recorder.JsonlTraceExporter; @@ -220,6 +220,6 @@ try (JsonlTraceExporter exporter = .includeReasoningChunks(true) // optional .includeActingChunks(true) // optional .build()) { - // Add exporter into hooks list when building agent + // 在构建智能体时,将 exporter 加入 hooks 列表 } ```