From 7d8d4d00221b067f4f7fa7b5799b23ccf944d70d Mon Sep 17 00:00:00 2001 From: Xuan Gu <162244362+xuang7@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:04:43 -0700 Subject: [PATCH 1/2] update. --- .../storage/util/LakeFSStorageClient.scala | 42 +++++++++++++++---- .../resource/DatasetResourceSpec.scala | 26 ++++++++++++ 2 files changed, 61 insertions(+), 7 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index 09fa6f3eb30..492fffde468 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -300,8 +300,36 @@ object LakeFSStorageClient { .sortBy(_.getCreationDate)(Ordering[java.lang.Long].reverse) // Sort in descending order } + /** + * Fetches all pages from a paginated LakeFS API call. + * + * @param fetch A function that takes a pagination cursor and returns (results, pagination). + * @return All results across all pages. + */ + private def fetchAllPages[T]( + fetch: String => (java.util.List[T], Pagination) + ): List[T] = { + val allResults = scala.collection.mutable.ListBuffer[T]() + var hasMore = true + var after = "" + + while (hasMore) { + val (results, pagination) = fetch(after) + allResults ++= results.asScala + hasMore = pagination.getHasMore + if (hasMore) after = pagination.getNextOffset + } + + allResults.toList + } + def retrieveObjectsOfVersion(repoName: String, commitHash: String): List[ObjectStats] = { - objectsApi.listObjects(repoName, commitHash).execute().getResults.asScala.toList + fetchAllPages[ObjectStats] { after => + val request = objectsApi.listObjects(repoName, commitHash).amount(1000) + if (after.nonEmpty) request.after(after) + val response = request.execute() + (response.getResults, response.getPagination) + } } def retrieveRepositorySize(repoName: String, commitHash: String = ""): Long = { @@ -334,12 +362,12 @@ object LakeFSStorageClient { * @return List of uncommitted object stats. */ def retrieveUncommittedObjects(repoName: String): List[Diff] = { - branchesApi - .diffBranch(repoName, branchName) - .execute() - .getResults - .asScala - .toList + fetchAllPages[Diff] { after => + val request = branchesApi.diffBranch(repoName, branchName).amount(1000) + if (after.nonEmpty) request.after(after) + val response = request.execute() + (response.getResults, response.getPagination) + } } def createCommit(repoName: String, branch: String, commitMessage: String): Commit = { diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index a71861491a6..7c94dc8fe90 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -2445,4 +2445,30 @@ class DatasetResourceSpec fetchSession(filePath) shouldBe null fetchPartRows(uploadId) shouldBe empty } + + // =========================================================================== + // Pagination test – verify that listing APIs return more than the default (100 items) + // =========================================================================== + + "LakeFS pagination" should "return all files when count exceeds one page for both uncommitted and committed objects" taggedAs Slow in { + val repoName = + s"pagination-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}" + LakeFSStorageClient.initRepo(repoName) + + val totalFiles = 110 + (1 to totalFiles).foreach { i => + LakeFSStorageClient.writeFileToRepo( + repoName, + s"file-$i.txt", + new ByteArrayInputStream(s"content-$i".getBytes(StandardCharsets.UTF_8)) + ) + } + + // before commit: 110 files should appear as uncommitted diffs + LakeFSStorageClient.retrieveUncommittedObjects(repoName).size shouldEqual totalFiles + + // after commit: 110 files should appear as committed objects + val commit = LakeFSStorageClient.withCreateVersion(repoName, "commit all files") {} + LakeFSStorageClient.retrieveObjectsOfVersion(repoName, commit.getId).size shouldEqual totalFiles + } } From 07f4bb13cf5a2d9abfaab8294047abd815360962 Mon Sep 17 00:00:00 2001 From: Xuan Gu <162244362+xuang7@users.noreply.github.com> Date: Wed, 8 Apr 2026 12:44:11 -0700 Subject: [PATCH 2/2] update. --- .../amber/core/storage/util/LakeFSStorageClient.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index 492fffde468..14737b98001 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -35,6 +35,9 @@ import scala.jdk.CollectionConverters._ */ object LakeFSStorageClient { + // Maximum number of results per LakeFS API request (pagination page size) + private val PageSize = 1000 + private lazy val apiClient: ApiClient = { val client = new ApiClient() client.setApiKey(StorageConfig.lakefsPassword) @@ -311,7 +314,7 @@ object LakeFSStorageClient { ): List[T] = { val allResults = scala.collection.mutable.ListBuffer[T]() var hasMore = true - var after = "" + var after = "" // Pagination cursor returned by LakeFS while (hasMore) { val (results, pagination) = fetch(after) @@ -325,7 +328,7 @@ object LakeFSStorageClient { def retrieveObjectsOfVersion(repoName: String, commitHash: String): List[ObjectStats] = { fetchAllPages[ObjectStats] { after => - val request = objectsApi.listObjects(repoName, commitHash).amount(1000) + val request = objectsApi.listObjects(repoName, commitHash).amount(PageSize) if (after.nonEmpty) request.after(after) val response = request.execute() (response.getResults, response.getPagination) @@ -363,7 +366,7 @@ object LakeFSStorageClient { */ def retrieveUncommittedObjects(repoName: String): List[Diff] = { fetchAllPages[Diff] { after => - val request = branchesApi.diffBranch(repoName, branchName).amount(1000) + val request = branchesApi.diffBranch(repoName, branchName).amount(PageSize) if (after.nonEmpty) request.after(after) val response = request.execute() (response.getResults, response.getPagination)