From 83f795f3225cc54d775a13199727b32fb92cc866 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 12 Nov 2025 15:21:18 +0000 Subject: [PATCH 1/6] feat: update tasks/list implementation to match A2A 0.4.0 spec - Sort tasks by status.timestamp DESC (most recent first), then ID ASC - Add lastUpdatedAfter parameter filtering - Update InMemoryTaskStore and JpaDatabaseTaskStore sorting logic - Add timestamp denormalized column to JpaTask for efficient querying - Update all transport handlers (JSON-RPC, gRPC, REST) --- .../database/jpa/JpaDatabaseTaskStore.java | 16 +++++++-- .../taskstore/database/jpa/JpaTask.java | 18 +++++++++- .../jpa/JpaDatabaseTaskStoreTest.java | 19 ++++++---- .../server/rest/quarkus/A2AServerRoutes.java | 3 +- .../DefaultRequestHandler.java | 18 ++++++++-- .../a2a/server/tasks/InMemoryTaskStore.java | 36 ++++++++++--------- .../transport/rest/handler/RestHandler.java | 13 ++++++- 7 files changed, 92 insertions(+), 31 deletions(-) diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index 24dd3648d..5697d1d41 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -227,13 +227,19 @@ public ListTasksResult list(ListTasksParams params) { countQueryBuilder.append(" AND t.state = :state"); } + // Apply lastUpdatedAfter filter using denormalized timestamp column + if (params.lastUpdatedAfter() != null) { + queryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter"); + countQueryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter"); + } + // Apply pagination cursor (tasks after pageToken) if (params.pageToken() != null && !params.pageToken().isEmpty()) { queryBuilder.append(" AND t.id > :pageToken"); } - // Sort by task ID for consistent pagination - queryBuilder.append(" ORDER BY t.id"); + // Sort by status timestamp descending (most recent first), then by ID for stable ordering + queryBuilder.append(" ORDER BY t.statusTimestamp DESC, t.id ASC"); // Create and configure the main query TypedQuery query = em.createQuery(queryBuilder.toString(), JpaTask.class); @@ -245,6 +251,9 @@ public ListTasksResult list(ListTasksParams params) { if (params.status() != null) { query.setParameter("state", params.status().asString()); } + if (params.lastUpdatedAfter() != null) { + query.setParameter("lastUpdatedAfter", params.lastUpdatedAfter()); + } if (params.pageToken() != null && !params.pageToken().isEmpty()) { query.setParameter("pageToken", params.pageToken()); } @@ -270,6 +279,9 @@ public ListTasksResult list(ListTasksParams params) { if (params.status() != null) { countQuery.setParameter("state", params.status().asString()); } + if (params.lastUpdatedAfter() != null) { + countQuery.setParameter("lastUpdatedAfter", params.lastUpdatedAfter()); + } int totalSize = countQuery.getSingleResult().intValue(); // Deserialize tasks from JSON diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java index ebdb8f2a4..86df40676 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java @@ -25,6 +25,9 @@ public class JpaTask { @Column(name = "state") private String state; + @Column(name = "status_timestamp") + private Instant statusTimestamp; + @Column(name = "task_data", columnDefinition = "TEXT", nullable = false) private String taskJson; @@ -67,6 +70,14 @@ public void setState(String state) { this.state = state; } + public Instant getStatusTimestamp() { + return statusTimestamp; + } + + public void setStatusTimestamp(Instant statusTimestamp) { + this.statusTimestamp = statusTimestamp; + } + public String getTaskJson() { return taskJson; } @@ -123,7 +134,7 @@ static JpaTask createFromTask(Task task) throws JsonProcessingException { } /** - * Updates denormalized fields (contextId, state) from the task object. + * Updates denormalized fields (contextId, state, statusTimestamp) from the task object. * These fields are duplicated from the JSON to enable efficient querying. * * @param task the task to extract fields from @@ -133,8 +144,13 @@ private void updateDenormalizedFields(Task task) { if (task.getStatus() != null) { io.a2a.spec.TaskState taskState = task.getStatus().state(); this.state = (taskState != null) ? taskState.asString() : null; + // Extract status timestamp for efficient querying and sorting + this.statusTimestamp = (task.getStatus().timestamp() != null) + ? task.getStatus().timestamp().toInstant() + : null; } else { this.state = null; + this.statusTimestamp = null; } } diff --git a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java index 792bf8c0f..539a4d214 100644 --- a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java +++ b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java @@ -418,12 +418,14 @@ public void testListTasksCombinedFilters() { @Test @Transactional public void testListTasksPagination() { - // Create 5 tasks + // Create 5 tasks with same timestamp to ensure ID-based pagination works + // (With timestamp DESC sorting, same timestamps allow ID ASC tie-breaking) + java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); for (int i = 1; i <= 5; i++) { Task task = new Task.Builder() .id("task-page-" + i) .contextId("context-pagination") - .status(new TaskStatus(TaskState.SUBMITTED)) + .status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp)) .build(); taskStore.save(task); } @@ -576,23 +578,26 @@ public void testListTasksDefaultPageSize() { @Test @Transactional public void testListTasksOrderingById() { - // Create tasks with IDs that will sort in specific order + // Create tasks with same timestamp to test ID-based tie-breaking + // (spec requires sorting by timestamp DESC, then ID ASC) + java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); + Task task1 = new Task.Builder() .id("task-order-a") .contextId("context-order") - .status(new TaskStatus(TaskState.SUBMITTED)) + .status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp)) .build(); Task task2 = new Task.Builder() .id("task-order-b") .contextId("context-order") - .status(new TaskStatus(TaskState.SUBMITTED)) + .status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp)) .build(); Task task3 = new Task.Builder() .id("task-order-c") .contextId("context-order") - .status(new TaskStatus(TaskState.SUBMITTED)) + .status(new TaskStatus(TaskState.SUBMITTED, null, sameTimestamp)) .build(); // Save in reverse order @@ -600,7 +605,7 @@ public void testListTasksOrderingById() { taskStore.save(task1); taskStore.save(task2); - // List should return in ID order + // List should return sorted by timestamp DESC (all same), then by ID ASC ListTasksParams params = new ListTasksParams.Builder() .contextId("context-order") .build(); diff --git a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java index 75c34f247..fe153e67c 100644 --- a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java @@ -128,6 +128,7 @@ public void listTasks(RoutingContext rc) { String pageSizeStr = rc.request().params().get("pageSize"); String pageToken = rc.request().params().get("pageToken"); String historyLengthStr = rc.request().params().get("historyLength"); + String lastUpdatedAfter = rc.request().params().get("lastUpdatedAfter"); String includeArtifactsStr = rc.request().params().get("includeArtifacts"); // Parse optional parameters @@ -147,7 +148,7 @@ public void listTasks(RoutingContext rc) { } response = jsonRestHandler.listTasks(contextId, statusStr, pageSize, pageToken, - historyLength, includeArtifacts, context); + historyLength, lastUpdatedAfter, includeArtifacts, context); } catch (NumberFormatException e) { response = jsonRestHandler.createErrorResponse(new InvalidParamsError("Invalid number format in parameters")); } catch (IllegalArgumentException e) { diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 56efc70ce..03a90409d 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -43,6 +43,7 @@ import io.a2a.spec.EventKind; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.InternalError; +import io.a2a.spec.InvalidParamsError; import io.a2a.spec.JSONRPCError; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.ListTasksParams; @@ -165,8 +166,21 @@ private static Task limitTaskHistory(Task task, int historyLength) { @Override public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext context) throws JSONRPCError { - LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}", - params.contextId(), params.status(), params.pageSize(), params.pageToken()); + LOGGER.debug("onListTasks with contextId={}, status={}, pageSize={}, pageToken={}, lastUpdatedAfter={}", + params.contextId(), params.status(), params.pageSize(), params.pageToken(), params.lastUpdatedAfter()); + + // Validate lastUpdatedAfter timestamp if provided + if (params.lastUpdatedAfter() != null) { + // Check if timestamp is in the future (optional validation per spec) + java.time.Instant now = java.time.Instant.now(); + if (params.lastUpdatedAfter().isAfter(now)) { + java.util.Map errorData = new java.util.HashMap<>(); + errorData.put("parameter", "lastUpdatedAfter"); + errorData.put("reason", "Timestamp cannot be in the future"); + throw new InvalidParamsError(null, "Invalid params", errorData); + } + } + ListTasksResult result = taskStore.list(params); LOGGER.debug("Found {} tasks (total: {})", result.pageSize(), result.totalSize()); return result; diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 0fd604089..4c37ce66d 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -49,12 +49,18 @@ public ListTasksResult list(ListTasksParams params) { task.getStatus() != null && params.status().equals(task.getStatus().state()) ); } - // Note: lastUpdatedAfter filtering not implemented in InMemoryTaskStore - // as Task doesn't have a lastUpdated timestamp field + if (params.lastUpdatedAfter() != null) { + taskStream = taskStream.filter(task -> + task.getStatus() != null && + task.getStatus().timestamp() != null && + task.getStatus().timestamp().toInstant().isAfter(params.lastUpdatedAfter()) + ); + } - // Sort by task ID for consistent pagination + // Sort by status timestamp descending (most recent first), then by ID ascending for stable ordering List allFilteredTasks = taskStream - .sorted(Comparator.comparing(Task::getId)) + .sorted(Comparator.comparing((Task t) -> t.getStatus().timestamp(), Comparator.nullsLast(Comparator.reverseOrder())) + .thenComparing(Task::getId)) .toList(); int totalSize = allFilteredTasks.size(); @@ -63,21 +69,17 @@ public ListTasksResult list(ListTasksParams params) { int pageSize = params.getEffectivePageSize(); int startIndex = 0; - // Handle page token (simple cursor: last task ID from previous page) + // Handle page token (cursor: task ID from previous page) + // Since we're sorted by timestamp DESC then ID ASC, we can't use binary search + // Instead, find the task with the matching ID using linear search if (params.pageToken() != null && !params.pageToken().isEmpty()) { - // Use binary search since list is sorted by task ID (O(log N) vs O(N)) - int index = Collections.binarySearch(allFilteredTasks, null, - (t1, t2) -> { - // Handle null key comparisons (binarySearch passes null as one argument) - if (t1 == null && t2 == null) return 0; - if (t1 == null) return params.pageToken().compareTo(t2.getId()); - if (t2 == null) return t1.getId().compareTo(params.pageToken()); - return t1.getId().compareTo(t2.getId()); - }); - if (index >= 0) { - startIndex = index + 1; + for (int i = 0; i < allFilteredTasks.size(); i++) { + if (allFilteredTasks.get(i).getId().equals(params.pageToken())) { + startIndex = i + 1; + break; + } } - // If not found (index < 0), startIndex remains 0 (start from beginning) + // If not found, startIndex remains 0 (start from beginning) } // Get the page of tasks diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index 398a85710..f74a9c5e9 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -182,7 +182,8 @@ public HTTPRestResponse getTask(String taskId, int historyLength, ServerCallCont public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String status, @Nullable Integer pageSize, @Nullable String pageToken, - @Nullable Integer historyLength, @Nullable Boolean includeArtifacts, + @Nullable Integer historyLength, @Nullable String lastUpdatedAfter, + @Nullable Boolean includeArtifacts, ServerCallContext context) { try { // Build params @@ -202,6 +203,16 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s if (historyLength != null) { paramsBuilder.historyLength(historyLength); } + if (lastUpdatedAfter != null) { + try { + paramsBuilder.lastUpdatedAfter(java.time.Instant.parse(lastUpdatedAfter)); + } catch (java.time.format.DateTimeParseException e) { + java.util.Map errorData = new java.util.HashMap<>(); + errorData.put("parameter", "lastUpdatedAfter"); + errorData.put("reason", "Must be valid ISO-8601 timestamp"); + throw new InvalidParamsError(null, "Invalid params", errorData); + } + } if (includeArtifacts != null) { paramsBuilder.includeArtifacts(includeArtifacts); } From 856115a13ceae8af861d8325170a574e2e3256b1 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 12 Nov 2025 15:38:45 +0000 Subject: [PATCH 2/6] Review fixes --- .../jpa/JpaDatabaseTaskStoreTest.java | 5 +-- .../DefaultRequestHandler.java | 7 ++-- .../a2a/server/tasks/InMemoryTaskStore.java | 33 +++++++------------ .../transport/rest/handler/RestHandler.java | 10 ++++-- 4 files changed, 26 insertions(+), 29 deletions(-) diff --git a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java index 539a4d214..95af32aeb 100644 --- a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java +++ b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -420,7 +421,7 @@ public void testListTasksCombinedFilters() { public void testListTasksPagination() { // Create 5 tasks with same timestamp to ensure ID-based pagination works // (With timestamp DESC sorting, same timestamps allow ID ASC tie-breaking) - java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); + OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC); for (int i = 1; i <= 5; i++) { Task task = new Task.Builder() .id("task-page-" + i) @@ -580,7 +581,7 @@ public void testListTasksDefaultPageSize() { public void testListTasksOrderingById() { // Create tasks with same timestamp to test ID-based tie-breaking // (spec requires sorting by timestamp DESC, then ID ASC) - java.time.OffsetDateTime sameTimestamp = java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC); + OffsetDateTime sameTimestamp = OffsetDateTime.now(java.time.ZoneOffset.UTC); Task task1 = new Task.Builder() .id("task-order-a") diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 03a90409d..0671ce883 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -5,8 +5,11 @@ import static io.a2a.server.util.async.AsyncUtils.processor; import static java.util.concurrent.TimeUnit.*; +import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -172,9 +175,9 @@ public ListTasksResult onListTasks(ListTasksParams params, ServerCallContext con // Validate lastUpdatedAfter timestamp if provided if (params.lastUpdatedAfter() != null) { // Check if timestamp is in the future (optional validation per spec) - java.time.Instant now = java.time.Instant.now(); + Instant now = Instant.now(); if (params.lastUpdatedAfter().isAfter(now)) { - java.util.Map errorData = new java.util.HashMap<>(); + Map errorData = new HashMap<>(); errorData.put("parameter", "lastUpdatedAfter"); errorData.put("reason", "Timestamp cannot be in the future"); throw new InvalidParamsError(null, "Invalid params", errorData); diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 4c37ce66d..83eab8ae2 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -38,28 +38,17 @@ public void delete(String taskId) { @Override public ListTasksResult list(ListTasksParams params) { - Stream taskStream = tasks.values().stream(); - - // Apply filters - if (params.contextId() != null) { - taskStream = taskStream.filter(task -> params.contextId().equals(task.getContextId())); - } - if (params.status() != null) { - taskStream = taskStream.filter(task -> - task.getStatus() != null && params.status().equals(task.getStatus().state()) - ); - } - if (params.lastUpdatedAfter() != null) { - taskStream = taskStream.filter(task -> - task.getStatus() != null && - task.getStatus().timestamp() != null && - task.getStatus().timestamp().toInstant().isAfter(params.lastUpdatedAfter()) - ); - } - - // Sort by status timestamp descending (most recent first), then by ID ascending for stable ordering - List allFilteredTasks = taskStream - .sorted(Comparator.comparing((Task t) -> t.getStatus().timestamp(), Comparator.nullsLast(Comparator.reverseOrder())) + // Filter and sort tasks in a single stream pipeline + List allFilteredTasks = tasks.values().stream() + .filter(task -> params.contextId() == null || params.contextId().equals(task.getContextId())) + .filter(task -> params.status() == null || + (task.getStatus() != null && params.status().equals(task.getStatus().state()))) + .filter(task -> params.lastUpdatedAfter() == null || + (task.getStatus() != null && + task.getStatus().timestamp() != null && + task.getStatus().timestamp().toInstant().isAfter(params.lastUpdatedAfter()))) + .sorted(Comparator.comparing((Task t) -> t.getStatus().timestamp(), + Comparator.nullsLast(Comparator.reverseOrder())) .thenComparing(Task::getId)) .toList(); diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index f74a9c5e9..105831262 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -11,7 +11,11 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import java.time.Instant; +import java.time.format.DateTimeParseException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.Flow; import io.a2a.server.PublicAgentCard; @@ -205,9 +209,9 @@ public HTTPRestResponse listTasks(@Nullable String contextId, @Nullable String s } if (lastUpdatedAfter != null) { try { - paramsBuilder.lastUpdatedAfter(java.time.Instant.parse(lastUpdatedAfter)); - } catch (java.time.format.DateTimeParseException e) { - java.util.Map errorData = new java.util.HashMap<>(); + paramsBuilder.lastUpdatedAfter(Instant.parse(lastUpdatedAfter)); + } catch (DateTimeParseException e) { + Map errorData = new HashMap<>(); errorData.put("parameter", "lastUpdatedAfter"); errorData.put("reason", "Must be valid ISO-8601 timestamp"); throw new InvalidParamsError(null, "Invalid params", errorData); From 757ed8a0d4bbb2bb848356652d6dc6ba7c3d7f7b Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 12 Nov 2025 15:57:18 +0000 Subject: [PATCH 3/6] Gemini review --- .../database/jpa/JpaDatabaseTaskStore.java | 42 ++++++- .../taskstore/database/jpa/JpaTask.java | 3 +- .../jpa/JpaDatabaseTaskStoreTest.java | 116 ++++++++++++++++++ .../server/rest/quarkus/A2AServerRoutes.java | 2 +- .../a2a/server/tasks/InMemoryTaskStore.java | 52 ++++++-- 5 files changed, 198 insertions(+), 17 deletions(-) diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index 5697d1d41..506db3320 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -233,9 +233,17 @@ public ListTasksResult list(ListTasksParams params) { countQueryBuilder.append(" AND t.statusTimestamp > :lastUpdatedAfter"); } - // Apply pagination cursor (tasks after pageToken) + // Apply pagination cursor using keyset pagination for composite sort (timestamp DESC, id ASC) + // PageToken format: "timestamp_millis:taskId" (e.g., "1699999999000:task-123") if (params.pageToken() != null && !params.pageToken().isEmpty()) { - queryBuilder.append(" AND t.id > :pageToken"); + String[] tokenParts = params.pageToken().split(":", 2); + if (tokenParts.length == 2) { + // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId) + queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))"); + } else { + // Fallback for legacy pageToken format (ID only) - for backward compatibility during transition + queryBuilder.append(" AND t.id > :pageToken"); + } } // Sort by status timestamp descending (most recent first), then by ID for stable ordering @@ -255,7 +263,23 @@ public ListTasksResult list(ListTasksParams params) { query.setParameter("lastUpdatedAfter", params.lastUpdatedAfter()); } if (params.pageToken() != null && !params.pageToken().isEmpty()) { - query.setParameter("pageToken", params.pageToken()); + String[] tokenParts = params.pageToken().split(":", 2); + if (tokenParts.length == 2) { + // Parse keyset pagination parameters + try { + long timestampMillis = Long.parseLong(tokenParts[0]); + Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis); + String tokenId = tokenParts[1]; + query.setParameter("tokenTimestamp", tokenTimestamp); + query.setParameter("tokenId", tokenId); + } catch (NumberFormatException e) { + // Invalid timestamp format, fall back to ID-only + query.setParameter("pageToken", params.pageToken()); + } + } else { + // Legacy format (ID only) + query.setParameter("pageToken", params.pageToken()); + } } // Apply page size limit (+1 to check for next page) @@ -295,10 +319,18 @@ public ListTasksResult list(ListTasksParams params) { } } - // Determine next page token (ID of last task if there are more results) + // Determine next page token (timestamp:ID of last task if there are more results) + // Format: "timestamp_millis:taskId" for keyset pagination String nextPageToken = null; if (hasMore && !tasks.isEmpty()) { - nextPageToken = tasks.get(tasks.size() - 1).getId(); + Task lastTask = tasks.get(tasks.size() - 1); + if (lastTask.getStatus() != null && lastTask.getStatus().timestamp() != null) { + long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); + nextPageToken = timestampMillis + ":" + lastTask.getId(); + } else { + // Fallback to ID-only if timestamp is missing (shouldn't happen with proper data) + nextPageToken = lastTask.getId(); + } } // Apply post-processing transformations (history limiting, artifact removal) diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java index 86df40676..9f38dee41 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaTask.java @@ -145,8 +145,9 @@ private void updateDenormalizedFields(Task task) { io.a2a.spec.TaskState taskState = task.getStatus().state(); this.state = (taskState != null) ? taskState.asString() : null; // Extract status timestamp for efficient querying and sorting + // Truncate to milliseconds for keyset pagination consistency (pageToken uses millis) this.statusTimestamp = (task.getStatus().timestamp() != null) - ? task.getStatus().timestamp().toInstant() + ? task.getStatus().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS) : null; } else { this.state = null; diff --git a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java index 95af32aeb..f7ec838ae 100644 --- a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java +++ b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java @@ -468,6 +468,122 @@ public void testListTasksPagination() { assertNull(result3.nextPageToken(), "Last page should have no next page token"); } + @Test + @Transactional + public void testListTasksPaginationWithDifferentTimestamps() { + // Create tasks with different timestamps to verify keyset pagination + // with composite sort (timestamp DESC, id ASC) + OffsetDateTime now = OffsetDateTime.now(java.time.ZoneOffset.UTC); + + // Task 1: 10 minutes ago, ID="task-diff-a" + Task task1 = new Task.Builder() + .id("task-diff-a") + .contextId("context-diff-timestamps") + .status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(10))) + .build(); + taskStore.save(task1); + + // Task 2: 5 minutes ago, ID="task-diff-b" + Task task2 = new Task.Builder() + .id("task-diff-b") + .contextId("context-diff-timestamps") + .status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5))) + .build(); + taskStore.save(task2); + + // Task 3: 5 minutes ago, ID="task-diff-c" (same timestamp as task2, tests ID tie-breaker) + Task task3 = new Task.Builder() + .id("task-diff-c") + .contextId("context-diff-timestamps") + .status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(5))) + .build(); + taskStore.save(task3); + + // Task 4: Now, ID="task-diff-d" + Task task4 = new Task.Builder() + .id("task-diff-d") + .contextId("context-diff-timestamps") + .status(new TaskStatus(TaskState.WORKING, null, now)) + .build(); + taskStore.save(task4); + + // Task 5: 1 minute ago, ID="task-diff-e" + Task task5 = new Task.Builder() + .id("task-diff-e") + .contextId("context-diff-timestamps") + .status(new TaskStatus(TaskState.WORKING, null, now.minusMinutes(1))) + .build(); + taskStore.save(task5); + + // Expected order (timestamp DESC, id ASC): + // 1. task-diff-d (now) + // 2. task-diff-e (1 min ago) + // 3. task-diff-b (5 min ago, ID 'b') + // 4. task-diff-c (5 min ago, ID 'c') + // 5. task-diff-a (10 min ago) + + // Page 1: Get first 2 tasks + ListTasksParams params1 = new ListTasksParams.Builder() + .contextId("context-diff-timestamps") + .pageSize(2) + .build(); + ListTasksResult result1 = taskStore.list(params1); + + assertEquals(5, result1.totalSize()); + assertEquals(2, result1.pageSize()); + assertNotNull(result1.nextPageToken(), "Should have next page token"); + + // Verify first page order + assertEquals("task-diff-d", result1.tasks().get(0).getId(), "First task should be most recent"); + assertEquals("task-diff-e", result1.tasks().get(1).getId(), "Second task should be 1 min ago"); + + // Verify pageToken format: "timestamp_millis:taskId" + assertTrue(result1.nextPageToken().contains(":"), "PageToken should have format timestamp:id"); + String[] tokenParts = result1.nextPageToken().split(":", 2); + assertEquals(2, tokenParts.length, "PageToken should have exactly 2 parts"); + assertEquals("task-diff-e", tokenParts[1], "PageToken should contain last task ID"); + + // Page 2: Get next 2 tasks + ListTasksParams params2 = new ListTasksParams.Builder() + .contextId("context-diff-timestamps") + .pageSize(2) + .pageToken(result1.nextPageToken()) + .build(); + ListTasksResult result2 = taskStore.list(params2); + + assertEquals(5, result2.totalSize()); + assertEquals(2, result2.pageSize()); + assertNotNull(result2.nextPageToken(), "Should have next page token"); + + // Verify second page order (tasks with same timestamp, sorted by ID) + assertEquals("task-diff-b", result2.tasks().get(0).getId(), "Third task should be 5 min ago, ID 'b'"); + assertEquals("task-diff-c", result2.tasks().get(1).getId(), "Fourth task should be 5 min ago, ID 'c'"); + + // Page 3: Get last task + ListTasksParams params3 = new ListTasksParams.Builder() + .contextId("context-diff-timestamps") + .pageSize(2) + .pageToken(result2.nextPageToken()) + .build(); + ListTasksResult result3 = taskStore.list(params3); + + assertEquals(5, result3.totalSize()); + assertEquals(1, result3.pageSize()); + assertNull(result3.nextPageToken(), "Last page should have no next page token"); + + // Verify last task + assertEquals("task-diff-a", result3.tasks().get(0).getId(), "Last task should be oldest"); + + // Verify no duplicates across all pages + List allTaskIds = new ArrayList<>(); + allTaskIds.addAll(result1.tasks().stream().map(Task::getId).toList()); + allTaskIds.addAll(result2.tasks().stream().map(Task::getId).toList()); + allTaskIds.addAll(result3.tasks().stream().map(Task::getId).toList()); + + assertEquals(5, allTaskIds.size(), "Should have exactly 5 tasks across all pages"); + assertEquals(5, allTaskIds.stream().distinct().count(), "Should have no duplicate tasks"); + } + @Test @Transactional public void testListTasksHistoryLimiting() { diff --git a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java index fe153e67c..88f2eac61 100644 --- a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java @@ -144,7 +144,7 @@ public void listTasks(RoutingContext rc) { Boolean includeArtifacts = null; if (includeArtifactsStr != null && !includeArtifactsStr.isEmpty()) { - includeArtifacts = Boolean.parseBoolean(includeArtifactsStr); + includeArtifacts = Boolean.valueOf(includeArtifactsStr); } response = jsonRestHandler.listTasks(contextId, statusStr, pageSize, pageToken, diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 83eab8ae2..d6bee18a4 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -58,27 +58,59 @@ public ListTasksResult list(ListTasksParams params) { int pageSize = params.getEffectivePageSize(); int startIndex = 0; - // Handle page token (cursor: task ID from previous page) - // Since we're sorted by timestamp DESC then ID ASC, we can't use binary search - // Instead, find the task with the matching ID using linear search + // Handle page token using keyset pagination (format: "timestamp_millis:taskId") + // Find the first task that comes after the pageToken position in sorted order if (params.pageToken() != null && !params.pageToken().isEmpty()) { - for (int i = 0; i < allFilteredTasks.size(); i++) { - if (allFilteredTasks.get(i).getId().equals(params.pageToken())) { - startIndex = i + 1; - break; + String[] tokenParts = params.pageToken().split(":", 2); + if (tokenParts.length == 2) { + try { + long tokenTimestampMillis = Long.parseLong(tokenParts[0]); + java.time.Instant tokenTimestamp = java.time.Instant.ofEpochMilli(tokenTimestampMillis); + String tokenId = tokenParts[1]; + + // Find first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId) + for (int i = 0; i < allFilteredTasks.size(); i++) { + Task task = allFilteredTasks.get(i); + if (task.getStatus() != null && task.getStatus().timestamp() != null) { + java.time.Instant taskTimestamp = task.getStatus().timestamp().toInstant(); + int timestampCompare = taskTimestamp.compareTo(tokenTimestamp); + + if (timestampCompare < 0 || (timestampCompare == 0 && task.getId().compareTo(tokenId) > 0)) { + startIndex = i; + break; + } + } + } + } catch (NumberFormatException e) { + // Invalid pageToken format, start from beginning + startIndex = 0; + } + } else { + // Legacy format (ID only) - fallback for backward compatibility + for (int i = 0; i < allFilteredTasks.size(); i++) { + if (allFilteredTasks.get(i).getId().equals(params.pageToken())) { + startIndex = i + 1; + break; + } } } - // If not found, startIndex remains 0 (start from beginning) } // Get the page of tasks int endIndex = Math.min(startIndex + pageSize, allFilteredTasks.size()); List pageTasks = allFilteredTasks.subList(startIndex, endIndex); - // Determine next page token + // Determine next page token (format: "timestamp_millis:taskId") String nextPageToken = null; if (endIndex < allFilteredTasks.size()) { - nextPageToken = allFilteredTasks.get(endIndex - 1).getId(); + Task lastTask = allFilteredTasks.get(endIndex - 1); + if (lastTask.getStatus() != null && lastTask.getStatus().timestamp() != null) { + long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); + nextPageToken = timestampMillis + ":" + lastTask.getId(); + } else { + // Fallback to ID-only if timestamp is missing + nextPageToken = lastTask.getId(); + } } // Transform tasks: limit history and optionally remove artifacts From cc846e7b5c90941e79e34b172f9ffcf1b879b8e8 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Wed, 12 Nov 2025 17:16:02 +0000 Subject: [PATCH 4/6] More review fixes --- .../database/jpa/JpaDatabaseTaskStore.java | 29 ++++---- .../jpa/JpaDatabaseTaskStoreTest.java | 43 ++++++++++++ .../a2a/server/tasks/InMemoryTaskStore.java | 66 +++++++++++-------- 3 files changed, 94 insertions(+), 44 deletions(-) diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index 506db3320..f7a652d60 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -239,10 +239,12 @@ public ListTasksResult list(ListTasksParams params) { String[] tokenParts = params.pageToken().split(":", 2); if (tokenParts.length == 2) { // Keyset pagination: get tasks where timestamp < tokenTimestamp OR (timestamp = tokenTimestamp AND id > tokenId) + // All tasks have timestamps (TaskStatus canonical constructor ensures this) queryBuilder.append(" AND (t.statusTimestamp < :tokenTimestamp OR (t.statusTimestamp = :tokenTimestamp AND t.id > :tokenId))"); } else { - // Fallback for legacy pageToken format (ID only) - for backward compatibility during transition - queryBuilder.append(" AND t.id > :pageToken"); + // Legacy ID-only pageToken format is not supported with timestamp-based sorting + // Throw error to prevent incorrect pagination results + throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null); } } @@ -268,18 +270,19 @@ public ListTasksResult list(ListTasksParams params) { // Parse keyset pagination parameters try { long timestampMillis = Long.parseLong(tokenParts[0]); - Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis); String tokenId = tokenParts[1]; + + // All tasks have timestamps (TaskStatus canonical constructor ensures this) + Instant tokenTimestamp = Instant.ofEpochMilli(timestampMillis); query.setParameter("tokenTimestamp", tokenTimestamp); query.setParameter("tokenId", tokenId); } catch (NumberFormatException e) { - // Invalid timestamp format, fall back to ID-only - query.setParameter("pageToken", params.pageToken()); + // Malformed timestamp in pageToken + throw new io.a2a.spec.InvalidParamsError(null, + "Invalid pageToken format: timestamp must be numeric milliseconds", null); } - } else { - // Legacy format (ID only) - query.setParameter("pageToken", params.pageToken()); } + // Note: Legacy ID-only format already rejected in query building phase } // Apply page size limit (+1 to check for next page) @@ -324,13 +327,9 @@ public ListTasksResult list(ListTasksParams params) { String nextPageToken = null; if (hasMore && !tasks.isEmpty()) { Task lastTask = tasks.get(tasks.size() - 1); - if (lastTask.getStatus() != null && lastTask.getStatus().timestamp() != null) { - long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); - nextPageToken = timestampMillis + ":" + lastTask.getId(); - } else { - // Fallback to ID-only if timestamp is missing (shouldn't happen with proper data) - nextPageToken = lastTask.getId(); - } + // All tasks have timestamps (TaskStatus canonical constructor ensures this) + long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); + nextPageToken = timestampMillis + ":" + lastTask.getId(); } // Apply post-processing transformations (history limiting, artifact removal) diff --git a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java index f7ec838ae..a7cb8d79d 100644 --- a/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java +++ b/extras/task-store-database-jpa/src/test/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStoreTest.java @@ -692,6 +692,49 @@ public void testListTasksDefaultPageSize() { assertNotNull(result.nextPageToken(), "Should have next page"); } + @Test + @Transactional + public void testListTasksInvalidPageTokenFormat() { + // Create a task + Task task = new Task.Builder() + .id("task-invalid-token") + .contextId("context-invalid-token") + .status(new TaskStatus(TaskState.WORKING)) + .build(); + taskStore.save(task); + + // Test 1: Legacy ID-only pageToken should throw InvalidParamsError + ListTasksParams params1 = new ListTasksParams.Builder() + .contextId("context-invalid-token") + .pageToken("task-invalid-token") // ID-only format (legacy) + .build(); + + try { + taskStore.list(params1); + throw new AssertionError("Expected InvalidParamsError for legacy ID-only pageToken"); + } catch (io.a2a.spec.InvalidParamsError e) { + // Expected - legacy format not supported + assertTrue(e.getMessage().contains("Invalid pageToken format"), + "Error message should mention invalid format"); + } + + // Test 2: Malformed timestamp in pageToken should throw InvalidParamsError + ListTasksParams params2 = new ListTasksParams.Builder() + .contextId("context-invalid-token") + .pageToken("not-a-number:task-id") // Invalid timestamp + .build(); + + try { + taskStore.list(params2); + throw new AssertionError("Expected InvalidParamsError for malformed timestamp"); + } catch (io.a2a.spec.InvalidParamsError e) { + // Expected - malformed timestamp + assertTrue(e.getMessage().contains("timestamp must be numeric"), + "Error message should mention numeric timestamp requirement"); + } + } + + @Test @Transactional public void testListTasksOrderingById() { diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index d6bee18a4..ba50c6cd6 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -47,7 +47,11 @@ public ListTasksResult list(ListTasksParams params) { (task.getStatus() != null && task.getStatus().timestamp() != null && task.getStatus().timestamp().toInstant().isAfter(params.lastUpdatedAfter()))) - .sorted(Comparator.comparing((Task t) -> t.getStatus().timestamp(), + .sorted(Comparator.comparing( + (Task t) -> (t.getStatus() != null && t.getStatus().timestamp() != null) + // Truncate to milliseconds for consistency with pageToken precision + ? t.getStatus().timestamp().toInstant().truncatedTo(java.time.temporal.ChronoUnit.MILLIS) + : null, Comparator.nullsLast(Comparator.reverseOrder())) .thenComparing(Task::getId)) .toList(); @@ -59,7 +63,7 @@ public ListTasksResult list(ListTasksParams params) { int startIndex = 0; // Handle page token using keyset pagination (format: "timestamp_millis:taskId") - // Find the first task that comes after the pageToken position in sorted order + // Use binary search to efficiently find the first task after the pageToken position (O(log N)) if (params.pageToken() != null && !params.pageToken().isEmpty()) { String[] tokenParts = params.pageToken().split(":", 2); if (tokenParts.length == 2) { @@ -68,31 +72,39 @@ public ListTasksResult list(ListTasksParams params) { java.time.Instant tokenTimestamp = java.time.Instant.ofEpochMilli(tokenTimestampMillis); String tokenId = tokenParts[1]; - // Find first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId) - for (int i = 0; i < allFilteredTasks.size(); i++) { - Task task = allFilteredTasks.get(i); - if (task.getStatus() != null && task.getStatus().timestamp() != null) { - java.time.Instant taskTimestamp = task.getStatus().timestamp().toInstant(); - int timestampCompare = taskTimestamp.compareTo(tokenTimestamp); - - if (timestampCompare < 0 || (timestampCompare == 0 && task.getId().compareTo(tokenId) > 0)) { - startIndex = i; - break; - } + // Binary search for first task where: timestamp < tokenTimestamp OR (timestamp == tokenTimestamp AND id > tokenId) + // Since list is sorted (timestamp DESC, id ASC), we search for the insertion point + int left = 0; + int right = allFilteredTasks.size(); + + while (left < right) { + int mid = left + (right - left) / 2; + Task task = allFilteredTasks.get(mid); + + // All tasks have timestamps (TaskStatus canonical constructor ensures this) + // Truncate to milliseconds for consistency with pageToken precision + java.time.Instant taskTimestamp = task.getStatus().timestamp().toInstant() + .truncatedTo(java.time.temporal.ChronoUnit.MILLIS); + int timestampCompare = taskTimestamp.compareTo(tokenTimestamp); + + if (timestampCompare < 0 || (timestampCompare == 0 && task.getId().compareTo(tokenId) > 0)) { + // This task is after the token, search left half + right = mid; + } else { + // This task is before or equal to token, search right half + left = mid + 1; } } + startIndex = left; } catch (NumberFormatException e) { - // Invalid pageToken format, start from beginning - startIndex = 0; + // Malformed timestamp in pageToken + throw new io.a2a.spec.InvalidParamsError(null, + "Invalid pageToken format: timestamp must be numeric milliseconds", null); } } else { - // Legacy format (ID only) - fallback for backward compatibility - for (int i = 0; i < allFilteredTasks.size(); i++) { - if (allFilteredTasks.get(i).getId().equals(params.pageToken())) { - startIndex = i + 1; - break; - } - } + // Legacy ID-only pageToken format is not supported with timestamp-based sorting + // Throw error to prevent incorrect pagination results + throw new io.a2a.spec.InvalidParamsError(null, "Invalid pageToken format: expected 'timestamp:id'", null); } } @@ -104,13 +116,9 @@ public ListTasksResult list(ListTasksParams params) { String nextPageToken = null; if (endIndex < allFilteredTasks.size()) { Task lastTask = allFilteredTasks.get(endIndex - 1); - if (lastTask.getStatus() != null && lastTask.getStatus().timestamp() != null) { - long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); - nextPageToken = timestampMillis + ":" + lastTask.getId(); - } else { - // Fallback to ID-only if timestamp is missing - nextPageToken = lastTask.getId(); - } + // All tasks have timestamps (TaskStatus canonical constructor ensures this) + long timestampMillis = lastTask.getStatus().timestamp().toInstant().toEpochMilli(); + nextPageToken = timestampMillis + ":" + lastTask.getId(); } // Transform tasks: limit history and optionally remove artifacts From 6fd1ff9140726319f756af78c58a26b150d67a7e Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Thu, 13 Nov 2025 09:54:06 +0000 Subject: [PATCH 5/6] WIP for TCK --- .../database/jpa/JpaDatabaseTaskStore.java | 8 +- .../a2a/server/tasks/InMemoryTaskStore.java | 8 +- spec-grpc/PROTO_DIVERGENCE.md | 24 +++ spec-grpc/src/main/java/io/a2a/grpc/A2A.java | 140 +++++++++--------- .../java/io/a2a/grpc/ListTasksResponse.java | 106 +++++++++++-- .../a2a/grpc/ListTasksResponseOrBuilder.java | 13 +- .../java/io/a2a/grpc/utils/ProtoUtils.java | 1 + spec-grpc/src/main/proto/a2a.proto | 5 +- .../java/io/a2a/spec/ListTasksResult.java | 2 +- 9 files changed, 220 insertions(+), 87 deletions(-) create mode 100644 spec-grpc/PROTO_DIVERGENCE.md diff --git a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java index f7a652d60..202684f0a 100644 --- a/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java +++ b/extras/task-store-database-jpa/src/main/java/io/a2a/extras/taskstore/database/jpa/JpaDatabaseTaskStore.java @@ -345,9 +345,13 @@ public ListTasksResult list(ListTasksParams params) { } private Task transformTask(Task task, int historyLength, boolean includeArtifacts) { - // Limit history if needed (keep most recent N messages) + // Limit history based on historyLength parameter List history = task.getHistory(); - if (historyLength > 0 && history != null && history.size() > historyLength) { + if (historyLength == 0) { + // historyLength=0 means no history should be included + history = List.of(); + } else if (historyLength > 0 && history != null && history.size() > historyLength) { + // Keep most recent N messages history = history.subList(history.size() - historyLength, history.size()); } diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index ba50c6cd6..66e65f3b4 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -133,9 +133,13 @@ public ListTasksResult list(ListTasksParams params) { } private Task transformTask(Task task, int historyLength, boolean includeArtifacts) { - // Limit history if needed (keep most recent N messages) + // Limit history based on historyLength parameter List history = task.getHistory(); - if (historyLength > 0 && history != null && history.size() > historyLength) { + if (historyLength == 0) { + // historyLength=0 means no history should be included + history = List.of(); + } else if (historyLength > 0 && history != null && history.size() > historyLength) { + // Keep most recent N messages history = history.subList(history.size() - historyLength, history.size()); } diff --git a/spec-grpc/PROTO_DIVERGENCE.md b/spec-grpc/PROTO_DIVERGENCE.md new file mode 100644 index 000000000..44a0a4e37 --- /dev/null +++ b/spec-grpc/PROTO_DIVERGENCE.md @@ -0,0 +1,24 @@ +# Protobuf Schema Divergence from Upstream + +This document tracks intentional divergences from the upstream A2A protobuf schema at https://github.com/a2aproject/A2A/blob/main/specification/grpc/a2a.proto + +## Current Divergences + +### ListTasksResponse.page_size (Field #3) + +**Status**: ✅ Aligned with PR #1160 (v1.0 RC) +**Upstream PR**: https://github.com/a2aproject/A2A/pull/1160 +**Reason**: TCK tests require `pageSize` field in responses per A2A v0.4.0 spec. PR #1160 adds this field to the schema. +**Action Required**: Remove this divergence once PR #1160 is merged and we sync to v1.0 RC. +**Impact**: Field number change - `total_size` moved from #3 to #4 to accommodate `page_size` at #3. + +**Modified**: 2025-11-12 +**Tracking Issue**: https://github.com/a2aproject/A2A/pull/1160 + +## Sync Instructions + +When PR #1160 is merged: +1. Verify field numbers match our implementation +2. Remove this divergence note +3. Sync to upstream v1.0 RC tag +4. Regenerate protobuf classes: `cd spec-grpc && mvn clean install -Pproto-compile` diff --git a/spec-grpc/src/main/java/io/a2a/grpc/A2A.java b/spec-grpc/src/main/java/io/a2a/grpc/A2A.java index f6ec5eb57..28b11b95a 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/A2A.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/A2A.java @@ -442,77 +442,77 @@ public static void registerAllExtensions( "\001(\005\022\022\n\npage_token\030\004 \001(\t\022\026\n\016history_lengt" + "h\030\005 \001(\005\0225\n\021last_updated_time\030\006 \001(\0132\032.goo" + "gle.protobuf.Timestamp\022\031\n\021include_artifa" + - "cts\030\007 \001(\010\"]\n\021ListTasksResponse\022\033\n\005tasks\030" + + "cts\030\007 \001(\010\"p\n\021ListTasksResponse\022\033\n\005tasks\030" + "\001 \003(\0132\014.a2a.v1.Task\022\027\n\017next_page_token\030\002" + - " \001(\t\022\022\n\ntotal_size\030\003 \001(\005\"!\n\021CancelTaskRe" + - "quest\022\014\n\004name\030\001 \001(\t\"4\n$GetTaskPushNotifi" + - "cationConfigRequest\022\014\n\004name\030\001 \001(\t\"7\n\'Del" + - "eteTaskPushNotificationConfigRequest\022\014\n\004" + - "name\030\001 \001(\t\"\217\001\n\'CreateTaskPushNotificatio" + - "nConfigRequest\022\023\n\006parent\030\001 \001(\tB\003\340A\002\022\026\n\tc" + - "onfig_id\030\002 \001(\tB\003\340A\002\0227\n\006config\030\003 \001(\0132\".a2" + - "a.v1.TaskPushNotificationConfigB\003\340A\002\"\'\n\027" + - "TaskSubscriptionRequest\022\014\n\004name\030\001 \001(\t\"^\n" + - "%ListTaskPushNotificationConfigRequest\022\016" + - "\n\006parent\030\001 \001(\t\022\021\n\tpage_size\030\002 \001(\005\022\022\n\npag" + - "e_token\030\003 \001(\t\"\025\n\023GetAgentCardRequest\"g\n\023" + - "SendMessageResponse\022\034\n\004task\030\001 \001(\0132\014.a2a." + + " \001(\t\022\021\n\tpage_size\030\003 \001(\005\022\022\n\ntotal_size\030\004 " + + "\001(\005\"!\n\021CancelTaskRequest\022\014\n\004name\030\001 \001(\t\"4" + + "\n$GetTaskPushNotificationConfigRequest\022\014" + + "\n\004name\030\001 \001(\t\"7\n\'DeleteTaskPushNotificati" + + "onConfigRequest\022\014\n\004name\030\001 \001(\t\"\217\001\n\'Create" + + "TaskPushNotificationConfigRequest\022\023\n\006par" + + "ent\030\001 \001(\tB\003\340A\002\022\026\n\tconfig_id\030\002 \001(\tB\003\340A\002\0227" + + "\n\006config\030\003 \001(\0132\".a2a.v1.TaskPushNotifica" + + "tionConfigB\003\340A\002\"\'\n\027TaskSubscriptionReque" + + "st\022\014\n\004name\030\001 \001(\t\"^\n%ListTaskPushNotifica" + + "tionConfigRequest\022\016\n\006parent\030\001 \001(\t\022\021\n\tpag" + + "e_size\030\002 \001(\005\022\022\n\npage_token\030\003 \001(\t\"\025\n\023GetA" + + "gentCardRequest\"g\n\023SendMessageResponse\022\034" + + "\n\004task\030\001 \001(\0132\014.a2a.v1.TaskH\000\022\'\n\003msg\030\002 \001(" + + "\0132\017.a2a.v1.MessageH\000R\007messageB\t\n\007payload" + + "\"\326\001\n\016StreamResponse\022\034\n\004task\030\001 \001(\0132\014.a2a." + "v1.TaskH\000\022\'\n\003msg\030\002 \001(\0132\017.a2a.v1.MessageH" + - "\000R\007messageB\t\n\007payload\"\326\001\n\016StreamResponse" + - "\022\034\n\004task\030\001 \001(\0132\014.a2a.v1.TaskH\000\022\'\n\003msg\030\002 " + - "\001(\0132\017.a2a.v1.MessageH\000R\007message\0226\n\rstatu" + - "s_update\030\003 \001(\0132\035.a2a.v1.TaskStatusUpdate" + - "EventH\000\022:\n\017artifact_update\030\004 \001(\0132\037.a2a.v" + - "1.TaskArtifactUpdateEventH\000B\t\n\007payload\"v" + - "\n&ListTaskPushNotificationConfigResponse" + - "\0223\n\007configs\030\001 \003(\0132\".a2a.v1.TaskPushNotif" + - "icationConfig\022\027\n\017next_page_token\030\002 \001(\t*\372" + - "\001\n\tTaskState\022\032\n\026TASK_STATE_UNSPECIFIED\020\000" + - "\022\030\n\024TASK_STATE_SUBMITTED\020\001\022\026\n\022TASK_STATE" + - "_WORKING\020\002\022\030\n\024TASK_STATE_COMPLETED\020\003\022\025\n\021" + - "TASK_STATE_FAILED\020\004\022\030\n\024TASK_STATE_CANCEL" + - "LED\020\005\022\035\n\031TASK_STATE_INPUT_REQUIRED\020\006\022\027\n\023" + - "TASK_STATE_REJECTED\020\007\022\034\n\030TASK_STATE_AUTH" + - "_REQUIRED\020\010*;\n\004Role\022\024\n\020ROLE_UNSPECIFIED\020" + - "\000\022\r\n\tROLE_USER\020\001\022\016\n\nROLE_AGENT\020\0022\220\013\n\nA2A" + - "Service\022c\n\013SendMessage\022\032.a2a.v1.SendMess" + - "ageRequest\032\033.a2a.v1.SendMessageResponse\"" + - "\033\202\323\344\223\002\025\"\020/v1/message:send:\001*\022k\n\024SendStre" + - "amingMessage\022\032.a2a.v1.SendMessageRequest" + - "\032\026.a2a.v1.StreamResponse\"\035\202\323\344\223\002\027\"\022/v1/me" + - "ssage:stream:\001*0\001\022R\n\007GetTask\022\026.a2a.v1.Ge" + - "tTaskRequest\032\014.a2a.v1.Task\"!\332A\004name\202\323\344\223\002" + - "\024\022\022/v1/{name=tasks/*}\022S\n\tListTasks\022\030.a2a" + - ".v1.ListTasksRequest\032\031.a2a.v1.ListTasksR" + - "esponse\"\021\202\323\344\223\002\013\022\t/v1/tasks\022[\n\nCancelTask" + - "\022\031.a2a.v1.CancelTaskRequest\032\014.a2a.v1.Tas" + - "k\"$\202\323\344\223\002\036\"\031/v1/{name=tasks/*}:cancel:\001*\022" + - "s\n\020TaskSubscription\022\037.a2a.v1.TaskSubscri" + - "ptionRequest\032\026.a2a.v1.StreamResponse\"$\202\323" + - "\344\223\002\036\022\034/v1/{name=tasks/*}:subscribe0\001\022\305\001\n" + - " CreateTaskPushNotificationConfig\022/.a2a." + - "v1.CreateTaskPushNotificationConfigReque" + - "st\032\".a2a.v1.TaskPushNotificationConfig\"L" + - "\332A\rparent,config\202\323\344\223\0026\",/v1/{parent=task" + - "s/*/pushNotificationConfigs}:\006config\022\256\001\n" + - "\035GetTaskPushNotificationConfig\022,.a2a.v1." + - "GetTaskPushNotificationConfigRequest\032\".a" + - "2a.v1.TaskPushNotificationConfig\";\332A\004nam" + - "e\202\323\344\223\002.\022,/v1/{name=tasks/*/pushNotificat" + - "ionConfigs/*}\022\276\001\n\036ListTaskPushNotificati" + - "onConfig\022-.a2a.v1.ListTaskPushNotificati" + - "onConfigRequest\032..a2a.v1.ListTaskPushNot" + - "ificationConfigResponse\"=\332A\006parent\202\323\344\223\002." + - "\022,/v1/{parent=tasks/*}/pushNotificationC" + - "onfigs\022P\n\014GetAgentCard\022\033.a2a.v1.GetAgent" + - "CardRequest\032\021.a2a.v1.AgentCard\"\020\202\323\344\223\002\n\022\010" + - "/v1/card\022\250\001\n DeleteTaskPushNotificationC" + - "onfig\022/.a2a.v1.DeleteTaskPushNotificatio" + - "nConfigRequest\032\026.google.protobuf.Empty\";" + - "\332A\004name\202\323\344\223\002.*,/v1/{name=tasks/*/pushNot" + - "ificationConfigs/*}B7\n\013io.a2a.grpcB\003A2AP" + - "\001Z\030google.golang.org/a2a/v1\252\002\006A2a.V1b\006pr" + - "oto3" + "\000R\007message\0226\n\rstatus_update\030\003 \001(\0132\035.a2a." + + "v1.TaskStatusUpdateEventH\000\022:\n\017artifact_u" + + "pdate\030\004 \001(\0132\037.a2a.v1.TaskArtifactUpdateE" + + "ventH\000B\t\n\007payload\"v\n&ListTaskPushNotific" + + "ationConfigResponse\0223\n\007configs\030\001 \003(\0132\".a" + + "2a.v1.TaskPushNotificationConfig\022\027\n\017next" + + "_page_token\030\002 \001(\t*\372\001\n\tTaskState\022\032\n\026TASK_" + + "STATE_UNSPECIFIED\020\000\022\030\n\024TASK_STATE_SUBMIT" + + "TED\020\001\022\026\n\022TASK_STATE_WORKING\020\002\022\030\n\024TASK_ST" + + "ATE_COMPLETED\020\003\022\025\n\021TASK_STATE_FAILED\020\004\022\030" + + "\n\024TASK_STATE_CANCELLED\020\005\022\035\n\031TASK_STATE_I" + + "NPUT_REQUIRED\020\006\022\027\n\023TASK_STATE_REJECTED\020\007" + + "\022\034\n\030TASK_STATE_AUTH_REQUIRED\020\010*;\n\004Role\022\024" + + "\n\020ROLE_UNSPECIFIED\020\000\022\r\n\tROLE_USER\020\001\022\016\n\nR" + + "OLE_AGENT\020\0022\220\013\n\nA2AService\022c\n\013SendMessag" + + "e\022\032.a2a.v1.SendMessageRequest\032\033.a2a.v1.S" + + "endMessageResponse\"\033\202\323\344\223\002\025\"\020/v1/message:" + + "send:\001*\022k\n\024SendStreamingMessage\022\032.a2a.v1" + + ".SendMessageRequest\032\026.a2a.v1.StreamRespo" + + "nse\"\035\202\323\344\223\002\027\"\022/v1/message:stream:\001*0\001\022R\n\007" + + "GetTask\022\026.a2a.v1.GetTaskRequest\032\014.a2a.v1" + + ".Task\"!\332A\004name\202\323\344\223\002\024\022\022/v1/{name=tasks/*}" + + "\022S\n\tListTasks\022\030.a2a.v1.ListTasksRequest\032" + + "\031.a2a.v1.ListTasksResponse\"\021\202\323\344\223\002\013\022\t/v1/" + + "tasks\022[\n\nCancelTask\022\031.a2a.v1.CancelTaskR" + + "equest\032\014.a2a.v1.Task\"$\202\323\344\223\002\036\"\031/v1/{name=" + + "tasks/*}:cancel:\001*\022s\n\020TaskSubscription\022\037" + + ".a2a.v1.TaskSubscriptionRequest\032\026.a2a.v1" + + ".StreamResponse\"$\202\323\344\223\002\036\022\034/v1/{name=tasks" + + "/*}:subscribe0\001\022\305\001\n CreateTaskPushNotifi" + + "cationConfig\022/.a2a.v1.CreateTaskPushNoti" + + "ficationConfigRequest\032\".a2a.v1.TaskPushN" + + "otificationConfig\"L\332A\rparent,config\202\323\344\223\002" + + "6\",/v1/{parent=tasks/*/pushNotificationC" + + "onfigs}:\006config\022\256\001\n\035GetTaskPushNotificat" + + "ionConfig\022,.a2a.v1.GetTaskPushNotificati" + + "onConfigRequest\032\".a2a.v1.TaskPushNotific" + + "ationConfig\";\332A\004name\202\323\344\223\002.\022,/v1/{name=ta" + + "sks/*/pushNotificationConfigs/*}\022\276\001\n\036Lis" + + "tTaskPushNotificationConfig\022-.a2a.v1.Lis" + + "tTaskPushNotificationConfigRequest\032..a2a" + + ".v1.ListTaskPushNotificationConfigRespon" + + "se\"=\332A\006parent\202\323\344\223\002.\022,/v1/{parent=tasks/*" + + "}/pushNotificationConfigs\022P\n\014GetAgentCar" + + "d\022\033.a2a.v1.GetAgentCardRequest\032\021.a2a.v1." + + "AgentCard\"\020\202\323\344\223\002\n\022\010/v1/card\022\250\001\n DeleteTa" + + "skPushNotificationConfig\022/.a2a.v1.Delete" + + "TaskPushNotificationConfigRequest\032\026.goog" + + "le.protobuf.Empty\";\332A\004name\202\323\344\223\002.*,/v1/{n" + + "ame=tasks/*/pushNotificationConfigs/*}B7" + + "\n\013io.a2a.grpcB\003A2AP\001Z\030google.golang.org/" + + "a2a/v1\252\002\006A2a.V1b\006proto3" }; descriptor = com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -781,7 +781,7 @@ public static void registerAllExtensions( internal_static_a2a_v1_ListTasksResponse_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_a2a_v1_ListTasksResponse_descriptor, - new java.lang.String[] { "Tasks", "NextPageToken", "TotalSize", }); + new java.lang.String[] { "Tasks", "NextPageToken", "PageSize", "TotalSize", }); internal_static_a2a_v1_CancelTaskRequest_descriptor = getDescriptor().getMessageTypes().get(37); internal_static_a2a_v1_CancelTaskRequest_fieldAccessorTable = new diff --git a/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponse.java b/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponse.java index 20ed57e27..966eac487 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponse.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponse.java @@ -159,14 +159,30 @@ public java.lang.String getNextPageToken() { } } - public static final int TOTAL_SIZE_FIELD_NUMBER = 3; + public static final int PAGE_SIZE_FIELD_NUMBER = 3; + private int pageSize_ = 0; + /** + *
+   * Number of tasks returned in this response.
+   * NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main
+   * 
+ * + * int32 page_size = 3; + * @return The pageSize. + */ + @java.lang.Override + public int getPageSize() { + return pageSize_; + } + + public static final int TOTAL_SIZE_FIELD_NUMBER = 4; private int totalSize_ = 0; /** *
    * Total number of tasks available (before pagination).
    * 
* - * int32 total_size = 3; + * int32 total_size = 4; * @return The totalSize. */ @java.lang.Override @@ -194,8 +210,11 @@ public void writeTo(com.google.protobuf.CodedOutputStream output) if (!com.google.protobuf.GeneratedMessage.isStringEmpty(nextPageToken_)) { com.google.protobuf.GeneratedMessage.writeString(output, 2, nextPageToken_); } + if (pageSize_ != 0) { + output.writeInt32(3, pageSize_); + } if (totalSize_ != 0) { - output.writeInt32(3, totalSize_); + output.writeInt32(4, totalSize_); } getUnknownFields().writeTo(output); } @@ -213,9 +232,13 @@ public int getSerializedSize() { if (!com.google.protobuf.GeneratedMessage.isStringEmpty(nextPageToken_)) { size += com.google.protobuf.GeneratedMessage.computeStringSize(2, nextPageToken_); } + if (pageSize_ != 0) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, pageSize_); + } if (totalSize_ != 0) { size += com.google.protobuf.CodedOutputStream - .computeInt32Size(3, totalSize_); + .computeInt32Size(4, totalSize_); } size += getUnknownFields().getSerializedSize(); memoizedSize = size; @@ -236,6 +259,8 @@ public boolean equals(final java.lang.Object obj) { .equals(other.getTasksList())) return false; if (!getNextPageToken() .equals(other.getNextPageToken())) return false; + if (getPageSize() + != other.getPageSize()) return false; if (getTotalSize() != other.getTotalSize()) return false; if (!getUnknownFields().equals(other.getUnknownFields())) return false; @@ -255,6 +280,8 @@ public int hashCode() { } hash = (37 * hash) + NEXT_PAGE_TOKEN_FIELD_NUMBER; hash = (53 * hash) + getNextPageToken().hashCode(); + hash = (37 * hash) + PAGE_SIZE_FIELD_NUMBER; + hash = (53 * hash) + getPageSize(); hash = (37 * hash) + TOTAL_SIZE_FIELD_NUMBER; hash = (53 * hash) + getTotalSize(); hash = (29 * hash) + getUnknownFields().hashCode(); @@ -400,6 +427,7 @@ public Builder clear() { } bitField0_ = (bitField0_ & ~0x00000001); nextPageToken_ = ""; + pageSize_ = 0; totalSize_ = 0; return this; } @@ -451,6 +479,9 @@ private void buildPartial0(io.a2a.grpc.ListTasksResponse result) { result.nextPageToken_ = nextPageToken_; } if (((from_bitField0_ & 0x00000004) != 0)) { + result.pageSize_ = pageSize_; + } + if (((from_bitField0_ & 0x00000008) != 0)) { result.totalSize_ = totalSize_; } } @@ -498,6 +529,9 @@ public Builder mergeFrom(io.a2a.grpc.ListTasksResponse other) { bitField0_ |= 0x00000002; onChanged(); } + if (other.getPageSize() != 0) { + setPageSize(other.getPageSize()); + } if (other.getTotalSize() != 0) { setTotalSize(other.getTotalSize()); } @@ -546,10 +580,15 @@ public Builder mergeFrom( break; } // case 18 case 24: { - totalSize_ = input.readInt32(); + pageSize_ = input.readInt32(); bitField0_ |= 0x00000004; break; } // case 24 + case 32: { + totalSize_ = input.readInt32(); + bitField0_ |= 0x00000008; + break; + } // case 32 default: { if (!super.parseUnknownField(input, extensionRegistry, tag)) { done = true; // was an endgroup tag @@ -976,13 +1015,60 @@ public Builder setNextPageTokenBytes( return this; } + private int pageSize_ ; + /** + *
+     * Number of tasks returned in this response.
+     * NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main
+     * 
+ * + * int32 page_size = 3; + * @return The pageSize. + */ + @java.lang.Override + public int getPageSize() { + return pageSize_; + } + /** + *
+     * Number of tasks returned in this response.
+     * NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main
+     * 
+ * + * int32 page_size = 3; + * @param value The pageSize to set. + * @return This builder for chaining. + */ + public Builder setPageSize(int value) { + + pageSize_ = value; + bitField0_ |= 0x00000004; + onChanged(); + return this; + } + /** + *
+     * Number of tasks returned in this response.
+     * NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main
+     * 
+ * + * int32 page_size = 3; + * @return This builder for chaining. + */ + public Builder clearPageSize() { + bitField0_ = (bitField0_ & ~0x00000004); + pageSize_ = 0; + onChanged(); + return this; + } + private int totalSize_ ; /** *
      * Total number of tasks available (before pagination).
      * 
* - * int32 total_size = 3; + * int32 total_size = 4; * @return The totalSize. */ @java.lang.Override @@ -994,14 +1080,14 @@ public int getTotalSize() { * Total number of tasks available (before pagination). * * - * int32 total_size = 3; + * int32 total_size = 4; * @param value The totalSize to set. * @return This builder for chaining. */ public Builder setTotalSize(int value) { totalSize_ = value; - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000008; onChanged(); return this; } @@ -1010,11 +1096,11 @@ public Builder setTotalSize(int value) { * Total number of tasks available (before pagination). * * - * int32 total_size = 3; + * int32 total_size = 4; * @return This builder for chaining. */ public Builder clearTotalSize() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000008); totalSize_ = 0; onChanged(); return this; diff --git a/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponseOrBuilder.java b/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponseOrBuilder.java index 08b8eb22a..31802fa93 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponseOrBuilder.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/ListTasksResponseOrBuilder.java @@ -76,12 +76,23 @@ io.a2a.grpc.TaskOrBuilder getTasksOrBuilder( com.google.protobuf.ByteString getNextPageTokenBytes(); + /** + *
+   * Number of tasks returned in this response.
+   * NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main
+   * 
+ * + * int32 page_size = 3; + * @return The pageSize. + */ + int getPageSize(); + /** *
    * Total number of tasks available (before pagination).
    * 
* - * int32 total_size = 3; + * int32 total_size = 4; * @return The totalSize. */ int getTotalSize(); diff --git a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java index 4a69af75e..8b23c5372 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java @@ -163,6 +163,7 @@ public static io.a2a.grpc.ListTasksResponse listTasksResult(io.a2a.spec.ListTask builder.setNextPageToken(result.nextPageToken()); } builder.setTotalSize(result.totalSize()); + builder.setPageSize(result.pageSize()); return builder.build(); } diff --git a/spec-grpc/src/main/proto/a2a.proto b/spec-grpc/src/main/proto/a2a.proto index 55f0bce88..0a2745e8a 100644 --- a/spec-grpc/src/main/proto/a2a.proto +++ b/spec-grpc/src/main/proto/a2a.proto @@ -751,8 +751,11 @@ message ListTasksResponse { // Token for retrieving the next page of results. // Empty string if no more results. string next_page_token = 2; + // Number of tasks returned in this response. + // NOTE: This field added from PR #1160 (v1.0 RC) - our proto temporarily diverges from main + int32 page_size = 3; // Total number of tasks available (before pagination). - int32 total_size = 3; + int32 total_size = 4; } // --8<-- [end:ListTasksResponse] diff --git a/spec/src/main/java/io/a2a/spec/ListTasksResult.java b/spec/src/main/java/io/a2a/spec/ListTasksResult.java index 3a02cd2f4..9197e1eac 100644 --- a/spec/src/main/java/io/a2a/spec/ListTasksResult.java +++ b/spec/src/main/java/io/a2a/spec/ListTasksResult.java @@ -21,7 +21,7 @@ public record ListTasksResult( List tasks, int totalSize, int pageSize, - @Nullable String nextPageToken + @JsonInclude(JsonInclude.Include.ALWAYS) @Nullable String nextPageToken ) { public ListTasksResult { Assert.checkNotNullParam("tasks", tasks); From ce6e64bda660a825479d9a4a78973c9811966eed Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Thu, 13 Nov 2025 14:52:11 +0000 Subject: [PATCH 6/6] More changes to for the TCK --- .../io/a2a/server/tasks/InMemoryTaskStore.java | 14 ++++++++++++++ .../java/io/a2a/grpc/utils/ProtoUtils.java | 18 +++++++++++++++++- tck/src/main/resources/application.properties | 1 + .../transport/rest/handler/RestHandler.java | 2 +- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java index 66e65f3b4..292ba7717 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java +++ b/server-common/src/main/java/io/a2a/server/tasks/InMemoryTaskStore.java @@ -10,6 +10,9 @@ import jakarta.enterprise.context.ApplicationScoped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.a2a.spec.Artifact; import io.a2a.spec.ListTasksParams; import io.a2a.spec.ListTasksResult; @@ -19,10 +22,12 @@ @ApplicationScoped public class InMemoryTaskStore implements TaskStore, TaskStateProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryTaskStore.class); private final ConcurrentMap tasks = new ConcurrentHashMap<>(); @Override public void save(Task task) { + LOGGER.debug("=== InMemoryTaskStore.save() === taskId=" + task.getId() + ", contextId=" + task.getContextId()); tasks.put(task.getId(), task); } @@ -38,6 +43,13 @@ public void delete(String taskId) { @Override public ListTasksResult list(ListTasksParams params) { + // DEBUG: Log store contents for totalSize investigation + LOGGER.debug("=== InMemoryTaskStore.list() DEBUG ==="); + LOGGER.debug("Total tasks in store: {}", tasks.size()); + LOGGER.debug("Filter contextId: {}", params.contextId()); + LOGGER.debug("Filter status: {}", params.status()); + tasks.values().forEach(t -> LOGGER.debug(" Task: id={}, contextId={}", t.getId(), t.getContextId())); + // Filter and sort tasks in a single stream pipeline List allFilteredTasks = tasks.values().stream() .filter(task -> params.contextId() == null || params.contextId().equals(task.getContextId())) @@ -57,6 +69,8 @@ public ListTasksResult list(ListTasksParams params) { .toList(); int totalSize = allFilteredTasks.size(); + LOGGER.debug("Filtered tasks count: {}", totalSize); + LOGGER.debug("=== END DEBUG ==="); // Apply pagination int pageSize = params.getEffectivePageSize(); diff --git a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java index 8b23c5372..0e1420810 100644 --- a/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java +++ b/spec-grpc/src/main/java/io/a2a/grpc/utils/ProtoUtils.java @@ -10,6 +10,9 @@ import java.util.Map; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.protobuf.ByteString; import com.google.protobuf.Struct; import com.google.protobuf.Value; @@ -70,6 +73,7 @@ public class ProtoUtils { public static class ToProto { + private static final Logger LOGGER = LoggerFactory.getLogger(ToProto.class); public static io.a2a.grpc.AgentCard agentCard(AgentCard agentCard) { io.a2a.grpc.AgentCard.Builder builder = io.a2a.grpc.AgentCard.newBuilder(); @@ -155,6 +159,11 @@ public static io.a2a.grpc.Task task(Task task) { } public static io.a2a.grpc.ListTasksResponse listTasksResult(io.a2a.spec.ListTasksResult result) { + LOGGER.debug("=== ProtoUtils.ToProto.listTasksResult() DEBUG ==="); + LOGGER.debug("Input result.totalSize(): {}", result.totalSize()); + LOGGER.debug("Input result.pageSize(): {}", result.pageSize()); + LOGGER.debug("Input result.tasks().size(): {}", result.tasks().size()); + io.a2a.grpc.ListTasksResponse.Builder builder = io.a2a.grpc.ListTasksResponse.newBuilder(); if (result.tasks() != null) { builder.addAllTasks(result.tasks().stream().map(ToProto::task).collect(Collectors.toList())); @@ -164,7 +173,14 @@ public static io.a2a.grpc.ListTasksResponse listTasksResult(io.a2a.spec.ListTask } builder.setTotalSize(result.totalSize()); builder.setPageSize(result.pageSize()); - return builder.build(); + + io.a2a.grpc.ListTasksResponse response = builder.build(); + LOGGER.debug("Output response.getTotalSize(): {}", response.getTotalSize()); + LOGGER.debug("Output response.getPageSize(): {}", response.getPageSize()); + LOGGER.debug("Output response.getTasksCount(): {}", response.getTasksCount()); + LOGGER.debug("=== END ProtoUtils DEBUG ==="); + + return response; } public static io.a2a.grpc.Message message(Message message) { diff --git a/tck/src/main/resources/application.properties b/tck/src/main/resources/application.properties index c68793be4..fbae3f29a 100644 --- a/tck/src/main/resources/application.properties +++ b/tck/src/main/resources/application.properties @@ -12,6 +12,7 @@ a2a.executor.keep-alive-seconds=60 quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG quarkus.log.category."io.a2a.server.events".level=DEBUG quarkus.log.category."io.a2a.server.tasks".level=DEBUG +quarkus.log.category."io.a2a.grpc.utils".level=DEBUG # Log to file for analysis quarkus.log.file.enable=true diff --git a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java index 105831262..25ed9b296 100644 --- a/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java +++ b/transport/rest/src/main/java/io/a2a/transport/rest/handler/RestHandler.java @@ -300,7 +300,7 @@ private void validate(String json) { private HTTPRestResponse createSuccessResponse(int statusCode, com.google.protobuf.Message.Builder builder) { try { - String jsonBody = JsonFormat.printer().print(builder); + String jsonBody = JsonFormat.printer().includingDefaultValueFields().print(builder); return new HTTPRestResponse(statusCode, "application/json", jsonBody); } catch (InvalidProtocolBufferException e) { return createErrorResponse(new InternalError("Failed to serialize response: " + e.getMessage()));