Skip to content

Conversation

@asuiu
Copy link
Member

@asuiu asuiu commented Apr 26, 2025

This doesn't change the library itself (excepting a minor add of buffering support in S3 bucket),
but it just adds a functional test that writes and reads in a stream manner a Parquet file to buckets.

@asuiu asuiu requested a review from crudeanu April 26, 2025 16:46
@coderabbitai
Copy link

coderabbitai bot commented Apr 26, 2025

📝 Walkthrough

Walkthrough

This change adds Parquet support in tests and stream wrappers for Parquet I/O, extends S3Bucket with configurable buffering and robust multipart upload handling, and adds a getSize(PurePosixPath) API to IBucket and its implementations (MemoryBucket, S3Bucket, S3BucketSDKv1). Maven test dependencies for Parquet/Hadoop were added. Tests were expanded to exercise Parquet roundtrips, large multipart uploads, and size retrieval. New Parquet stream/file wrappers enable seek/position semantics over streams for testing.

Sequence Diagram(s)

sequenceDiagram
    participant Test as Test Suite
    participant Parquet as ParquetUtils
    participant Bucket as IBucket (MemoryBucket / S3Bucket)
    participant S3Client as S3Client / S3AsyncClient
    participant S3Svc as S3 Service

    Test->>Parquet: generateParquetOutput() -> InputStream
    Test->>Bucket: putObjectStream(path, InputStream)
    alt MemoryBucket
        Bucket-->>Bucket: store bytes in memory
    else S3Bucket
        Bucket->>S3Client: putObject or initiate multipart upload
        S3Client->>S3Svc: upload parts / complete
    end
    Test->>Bucket: getObjectStream(path) or getSize(path)
    Bucket-->>Test: Buffered InputStream or size
    Test->>Parquet: read rows from InputStream
    Parquet-->>Test: verified row count
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The description is significantly incomplete. It lacks most template sections including ticket reference, formal scope checkbox selection, comprehensive checklist items, and risk/impact assessment. Only a brief informal note is provided. Complete the PR description using the template: add ticket/Trello link, formal scope selection, risk assessment (latency/API/migration), and checklist acknowledgment. The one-liner explanation should be expanded into proper sections.
Docstring Coverage ⚠️ Warning Docstring coverage is 2.94% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding a test for reading/writing Parquet files as streams to IBucket. It's specific and highlights the primary contribution.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)

80-89: 🛠️ Refactor suggestion

⚠️ Potential issue

Do not close the shared S3AsyncClient inside every putObjectStream call

finally { s3AsyncClient.close(); } shuts the client down after a single upload, rendering the S3Bucket instance unusable for any subsequent asynchronous operations (including multi-part uploads executed later in the same test run or in production code).
Move the shutdown logic to an explicit close() method on S3Bucket, or let the application lifecycle manage it.

@@
-        } finally {
-            s3AsyncClient.close();
-        }
+        }
🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)

35-48: Validate and rename the instance-level buffer size

  1. A negative or zero bufSize argument silently produces an unusable BufferedInputStream; add validation.
  2. BUF_SIZE is not a constant but an instance field; Java conventions recommend lowerCamelCase for variables.
-    protected final int BUF_SIZE; // 8 KB by default
+    protected final int bufSize;   // validated, per-instance
@@
-        this.BUF_SIZE = bufSize;
+        if (bufSize <= 0) {
+            throw new IllegalArgumentException("bufSize must be positive");
+        }
+        this.bufSize = bufSize;

Remember to update usages (BUF_SIZE → bufSize) below.

java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)

134-169: Use try-with-resources for ObjectStream to avoid leaks

objectStream is opened but never closed if an assertion fails. Wrap it in a try-with-resources block:

-        ObjectStream objectStream = storage.getObjectStream(path);
-        assertTrue(objectStream.getStream().markSupported());
+        try (ObjectStream objectStream = storage.getObjectStream(path)) {
+            assertTrue(objectStream.getStream().markSupported());
 ...
-        }
+        }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b91be22 and cfceeb8.

📒 Files selected for processing (7)
  • java/pom.xml (1 hunks)
  • java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (3 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (4 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java (1 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java (1 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java (2 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java (1 hunks)
🔇 Additional comments (7)
java/pom.xml (1)

55-67: Dependencies for Parquet testing look good!

The addition of Parquet and Hadoop dependencies with test scope is appropriate for the new streaming tests. Using recent versions (Parquet 1.15.1 and Hadoop 3.4.1) ensures compatibility with modern features while keeping them isolated to the test environment.

java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java (1)

44-47: Test for Parquet streaming looks good!

The new test method follows the established pattern of the class, delegating to the shared tester implementation. This maintains consistency while adding the new Parquet streaming functionality.

java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java (1)

29-32: Test implementation matches established pattern!

The added test follows the same pattern as in S3BucketTest, ensuring consistent testing of Parquet functionality across different bucket implementations.

java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java (2)

6-6: Import for @disabled annotation is appropriate

The import is correctly added to support the disabled test.


44-48: Well-documented disabled test for SDK V1

Good job properly documenting why this test is disabled with a clear explanation about AWS SDK V1's S3ObjectInputStream limitations for Parquet reading. This approach preserves test consistency across implementations while acknowledging the technical constraint.

java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)

163-171: Re-mark the buffered stream with a generous read-limit

Down-stream code (SeekableInputStreamWrapper) relies on InputStream.reset() to seek backwards.
BufferedInputStream marks are invalidated once more than readLimit bytes have been read; passing 0 (as done later) causes reset() to fail after the first read.

Although the faulty code resides in SeekableInputStreamWrapper, you can protect yourself here by pre-marking the stream with a large limit:

BufferedInputStream bis = new BufferedInputStream(inputStream, bufSize);
bis.mark(Integer.MAX_VALUE);
return new ObjectStream(bis, name.toString());

(See dedicated comment in ParquetUtils.java for the actual fix.)

java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)

305-342: Mark/reset support in the generated Parquet pipe may fail

SeekableInputStreamWrapper resets to the previously set mark (see separate comment) but the pipe created here is not wrapped in a BufferedInputStream and no mark is set. When tests later call objectStream.getStream().markSupported() the result is true (because S3 layer wraps it), yet seeking backwards on the returned stream will fail outside the test environment.

Consider wrapping inputStreamToReturn:

-PipedInputStream inputStreamToReturn  = new PipedInputStream(pipeBuffer);
+PipedInputStream inputStreamToReturn  = new PipedInputStream(pipeBuffer);
+inputStreamToReturn.mark(Integer.MAX_VALUE);

Comment on lines 195 to 199
public long getLength() throws IOException {
return stream.getStream().available();
}

@Override
Copy link

@coderabbitai coderabbitai bot Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

available() is not a reliable file length

InputStream.available() returns the number of immediately readable bytes, not the total length. For S3 streams this is often 0, which will break Parquet’s footer discovery.

Provide length via ObjectStream.getSize() (if available) or buffer the stream to measure once:

public long getLength() {
    return stream.getSize(); // expose object size from bucket meta-data
}

If no size is available, document the limitation clearly.

✅ Addressed in commit 3fcf702

Comment on lines 205 to 216
public static class SeekableInputStreamWrapper extends SeekableInputStream {
private final InputStream in;
private long pos = 0;
private final int markLimit = 0;

/**
* @param in the raw InputStream (e.g. S3ObjectInputStream)
*/
public SeekableInputStreamWrapper(InputStream in) {
this.in = in;
in.mark(markLimit);

Copy link

@coderabbitai coderabbitai bot Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

markLimit set to 0 breaks backward seek – increase and safeguard

SeekableInputStreamWrapper sets in.mark(markLimit) with markLimit = 0; after a single byte is read, reset() throws IOException ("Mark invalid"), which will surface when Parquet seeks backwards.

Fix:

-    private final int markLimit = 0;
+    private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind
@@
-        public SeekableInputStreamWrapper(InputStream in) {
-            this.in = in;
-            in.mark(markLimit);
+        public SeekableInputStreamWrapper(InputStream in) {
+            // Ensure mark/reset capability
+            this.in = in.markSupported() ? in : new BufferedInputStream(in);
+            this.in.mark(MARK_LIMIT);
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static class SeekableInputStreamWrapper extends SeekableInputStream {
private final InputStream in;
private long pos = 0;
private final int markLimit = 0;
/**
* @param in the raw InputStream (e.g. S3ObjectInputStream)
*/
public SeekableInputStreamWrapper(InputStream in) {
this.in = in;
in.mark(markLimit);
public static class SeekableInputStreamWrapper extends SeekableInputStream {
private final InputStream in;
private long pos = 0;
private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind
/**
* @param in the raw InputStream (e.g. S3ObjectInputStream)
*/
public SeekableInputStreamWrapper(InputStream in) {
// Ensure mark/reset capability
this.in = in.markSupported() ? in : new BufferedInputStream(in);
this.in.mark(MARK_LIMIT);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Fixed

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asuiu Awesome! 🚀 That's going to make the Parquet implementation way more robust. Sometimes the simplest fixes prevent the biggest headaches later. Seeking backward properly is critical for Parquet reading operations.

(_/)
(•ᴗ•)
/ >🔍

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java (1)

63-67: Implementation looks good but consider potential inefficiency.

The implementation correctly returns the object's size by leveraging the existing getObject method. This approach is simple but loads the entire object into memory to determine its size, which could be inefficient for very large objects. Since this is a memory-based implementation primarily for testing (as noted in the class documentation), this is acceptable, but it's worth documenting this limitation.

 @Override
 public long getSize(PurePosixPath name) throws IOException {
+    // Note: This loads the entire object into memory to determine size
+    // which is acceptable for test scenarios but inefficient for large objects
     byte[] content = getObject(name);
     return content.length;
 }
java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)

86-98: Well-implemented S3 size retrieval with proper exception handling.

This implementation is efficient as it retrieves only the metadata rather than downloading the entire object content. The exception handling is thorough, properly differentiating between a missing object (404) and other potential errors.

Consider adding braces around the single-line if statement at line 92-93 for consistency with the codebase style and to prevent potential future bugs:

 catch (AmazonS3Exception e) {
     if (e.getStatusCode() == 404)
+        {
         throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+        }
     throw new IOException("Failed to get object metadata: " + e.getMessage(), e);
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c7751a and 3fcf702.

📒 Files selected for processing (9)
  • java/src/main/java/com/esamtrade/bucketbase/IBucket.java (1 hunks)
  • java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java (1 hunks)
  • java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (4 hunks)
  • java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (2 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (4 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java (1 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java (1 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java (2 hunks)
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java
  • java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java
  • java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java
  • java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
  • java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java
🔇 Additional comments (2)
java/src/main/java/com/esamtrade/bucketbase/IBucket.java (1)

30-30: Interface extension looks good.

The new method getSize is a solid addition to the IBucket interface and provides a standardized way to retrieve object sizes. The method signature appropriately follows the same pattern as other interface methods with IOException handling.

java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)

3-3: LGTM: Required import for SDK exception handling.

The added import for SdkClientException is necessary for the appropriate exception handling in the new getSize method.

asuiu added 2 commits April 28, 2025 00:02
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
Copy link
Contributor

@crudeanu crudeanu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only some minor changes suggested here.

CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody);
return CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.join().eTag())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, I've missed this (gpt code).
Probably it's not a good idea to perform the upload sequentially, but to paralelize the upload.
Would you be so kind to suggest a better implementation?

});
writerThread.start();
try {
Thread.sleep(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember, unfortunately - yes, without this sleep the tests were flaky failing as the thread was not able to start fast enough... probably there's a much better solution with a countdownlock or other sync primitive. But I might be wrong, and maybe it's not necessary anymore after some refactor I made... I guess you can try to remove it , and if it all works for few times - probably it's not needed anymore


// wrap pipe in our PositionOutputStream
ParquetUtils.PositionOutputStreamWrapper posOutStream = new ParquetUtils.PositionOutputStreamWrapper(pipedOutputStream);
ParquetUtils.OutputFileWrapper OutputFileWrapper = new ParquetUtils.OutputFileWrapper(posOutStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider renaming this to camelcase: outputFileWrapper

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to make any renamings or refactoring you consider needed or useful

<!-- Parquet dependencies -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't parquet-hadoop enough? Do we need the whole bundle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea... maybe :)
Feel free to fix any inefficiencies. This PR was a hackish PoC written by GPT

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java`:
- Around line 221-234: Update S3Bucket.getSize to treat AwsServiceException with
HTTP 404 the same as NoSuchKey: catch AwsServiceException separately (in
addition to NoSuchKeyException) and check e.statusCode() == 404 (or
e.statusCode().toString()/int equivalent depending on type) and throw
FileNotFoundException in that case; otherwise keep the existing catch for
AwsServiceException | SdkClientException to wrap other failures in an
IOException. Ensure you reference getSize, S3Bucket, HeadObjectRequest/Response
and preserve the original error as cause when throwing IOException.

In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java`:
- Around line 365-379: The writer thread can throw and never close the
PipedOutputStream, causing uploadLargeStream to block; update the writerThread
block where ParquetWriter<Group> is created (the lambda that uses
ExampleParquetWriter.builder(OutputFileWrapper) and writes with
SimpleGroupFactory) to ensure the underlying PipedOutputStream is closed in a
finally block (or try-with-resources around that stream) so EOF is always
signaled on exception — i.e., add a finally that closes the PipedOutputStream
referenced by OutputFileWrapper (or the variable wrapping the pipe) to guarantee
the pipe is closed even if ParquetWriter.write throws.
- Around line 170-193: The method readRowsCountFromParquet leaks ObjectStream;
call storage.getSize(path) before opening the stream and wrap the ObjectStream
from storage.getObjectStream(path) in a try-with-resources so it is always
closed; create the ParquetUtils.StreamInputFile (and open ParquetFileReader)
inside that try block (or nested tries) so both the stream and the reader are
closed automatically; update references to ObjectStream,
storage.getObjectStream, storage.getSize, ParquetUtils.StreamInputFile, and
ParquetFileReader in readRowsCountFromParquet accordingly.

Comment on lines 221 to +234
@Override
public long getSize(PurePosixPath name) throws IOException {
try {
HeadObjectRequest request = HeadObjectRequest.builder()
.bucket(bucketName)
.key(name.toString())
.build();
HeadObjectResponse response = s3Client.headObject(request);
return response.contentLength();
} catch (NoSuchKeyException e) {
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
} catch (AwsServiceException | SdkClientException e) {
throw new IOException("Failed to get object size: " + name, e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

AWS SDK for Java v2 S3Client.headObject NoSuchKeyException missing object

💡 Result:

Short answer

  • S3Client.headObject throws NoSuchKeyException (an S3 service exception) when the key does not exist. If you lack s3:ListBucket permission S3 may instead return 403 (access denied). Use try/catch for NoSuchKeyException or inspect the S3Exception HTTP status/error code. [1][2]

Example (Java SDK v2)

  • Preferred: call headObject and handle missing-key explicitly:
    try {
    HeadObjectResponse r = s3.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
    // object exists
    } catch (NoSuchKeyException e) {
    // object missing
    } catch (S3Exception e) {
    // other service errors (e.g. 403 if no ListBucket permission) — check e.statusCode() / e.awsErrorDetails()
    } catch (SdkClientException e) {
    // client-side errors (network/credentials)
    }

Notes / alternatives

  • If you get 403 for a missing key, ensure the caller has s3:ListBucket on the bucket (or use a different approach). [2]
  • Alternatives: listObjectsV2 with prefix (slower), GetObjectAttributes (if available in your SDK/API region) or attempt a GetObject and handle 404. [5][8]

Sources

  • AWS Java SDK v2 S3Client.headObject docs (throws NoSuchKeyException; behavior depends on s3:ListBucket permission). [1][2]
  • AWS developer guide / examples (head/list guidance). [8]
  • Community/guide pages on HEAD returning 404 when object missing and 403 if no list permission. [5]

References
[1] S3Client API doc (headObject throws NoSuchKeyException).
[2] AWS S3 headObject behavior / permission details.
[5] Checking if an S3 object exists (example using head-object).
[8] AWS SDK for Java developer guide (S3 examples).

If you want, I can paste a ready-to-run Java method that wraps this and returns a boolean.


🌐 Web query:

MinIO S3 headObject 404 exception handling missing key

💡 Result:

Short answer

  • HeadObject/StatObject will return a 404/NotFound (or an SDK error variant that maps to NotFound/NoSuchKey) when the object key does not exist; some SDKs expose this as an HTTP 404 status, others as an error type/code (e.g. NotFound / NoSuchKey). Handle it by checking the HTTP status or the SDK error code and treating that case as “object missing.” (docs.aws.amazon.com)

Practical guidance

  • Catch the SDK error and inspect:
    • the HTTP status (== 404 / http.StatusNotFound), or
    • the service error code (e.g. "NoSuchKey" / NotFound / HeadObjectError::NotFound) — SDKs differ in representation. If you rely on string matching, prefer checking structured fields (error.Code / error.Type) when available. (docs.aws.amazon.com)
  • MinIO specifics:
    • MinIO commonly returns NoSuchKey / NotFound for missing objects; some client libraries historically required checking the error message text (fragile), and there are open issues about exposing stable error constants. Prefer checking HTTP status or SDK-provided enum/fields. (github.com)
  • Permissions caveat: AWS S3 returns 404 only if you have s3:ListBucket; without it you may get 403 (so treat 403 vs 404 differently if you expect “missing” vs “forbidden”). MinIO deployments behind proxies (Cloudflare, etc.) can also change returned status codes. (docs.aws.amazon.com)
  • Eventually-consistent/latency: if objects are created just before you call HeadObject, consider retries with exponential backoff rather than failing immediately. (stackoverflow.com)

Example patterns (pseudocode)

  • Generic:
    • try headObject()
    • if err == nil → object exists
    • else if err.httpStatus == 404 or err.code == "NoSuchKey" or err.isNotFound() → treat as missing
    • else → treat as transient or fatal error (retry or surface)

If you tell me which language/SDK you use (Go, Java, Python, .NET, JS, Rust), I’ll give a concrete code example.

Citations:


🌐 Web query:

AWS SDK Java v2 AwsServiceException statusCode method

💡 Result:

Yes — in AWS SDK for Java v2, AwsServiceException exposes the HTTP status code via SdkServiceException APIs:

  • Instance accessor: int statusCode() — returns the HTTP status code on the exception (defined on SdkServiceException and available on AwsServiceException). [1]
  • Builder setter: AwsServiceException.Builder statusCode(int statusCode) — used when building an AwsServiceException. [2]

References:
[1] AwsServiceException / SdkServiceException Javadoc (statusCode accessor).
[2] AwsServiceException.Builder Javadoc (statusCode(int) builder method).


Handle the edge case where 404 comes via AwsServiceException—good defensive programming, but don't overstate the impact.

headObject primarily throws NoSuchKeyException for missing objects (verified across AWS SDK v2 and MinIO). The edge case where you get AwsServiceException with statusCode == 404 instead is real but rare—defensive coding to handle it makes sense, but this isn't a blocker. The proposed fix is correct: split the exception handlers and check statusCode() == 404 on AwsServiceException. Current code handles the common path; this handles the outlier. Worth doing, but not critical for most deployments.

🔧 Proposed fix
-        } catch (NoSuchKeyException e) {
-            throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
-        } catch (AwsServiceException | SdkClientException e) {
-            throw new IOException("Failed to get object size: " + name, e);
-        }
+        } catch (NoSuchKeyException e) {
+            throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+        } catch (AwsServiceException e) {
+            if (e.statusCode() == 404) {
+                throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+            }
+            throw new IOException("Failed to get object size: " + name, e);
+        } catch (SdkClientException e) {
+            throw new IOException("Failed to get object size: " + name, e);
+        }
🤖 Prompt for AI Agents
In `@java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java` around lines 221 -
234, Update S3Bucket.getSize to treat AwsServiceException with HTTP 404 the same
as NoSuchKey: catch AwsServiceException separately (in addition to
NoSuchKeyException) and check e.statusCode() == 404 (or
e.statusCode().toString()/int equivalent depending on type) and throw
FileNotFoundException in that case; otherwise keep the existing catch for
AwsServiceException | SdkClientException to wrap other failures in an
IOException. Ensure you reference getSize, S3Bucket, HeadObjectRequest/Response
and preserve the original error as cause when throwing IOException.

Comment on lines +170 to +193
private int readRowsCountFromParquet(PurePosixPath path, MessageType schema) throws IOException {
ObjectStream objectStream = storage.getObjectStream(path);
assertTrue(objectStream.getStream().markSupported());
long size = storage.getSize(path);

InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size);
int count = 0;
try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
// get actual file schema
var metadata = reader.getFooter().getFileMetaData();
MessageType fileSchema = metadata.getSchema();
PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
MessageColumnIO columnIO = new ColumnIOFactory()
.getColumnIO(schema, fileSchema);
RecordReader<Group> rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) {
Group g = rr.read();
assertEquals(count, g.getInteger("int_field", 0));
count++;
}
}
}
return count;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the ObjectStream—open connections shouldn’t leak into orbit.

readRowsCountFromParquet never closes the ObjectStream. For S3-backed streams that can leave HTTP connections open and make tests flaky under load. Wrap it in try-with-resources (and ideally call getSize before opening the stream so you don’t leak on getSize failures).

🚀 Proposed fix
-        ObjectStream objectStream = storage.getObjectStream(path);
-        assertTrue(objectStream.getStream().markSupported());
-        long size = storage.getSize(path);
-
-        InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size);
-        int count = 0;
-        try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
+        long size = storage.getSize(path);
+        int count = 0;
+        try (ObjectStream objectStream = storage.getObjectStream(path)) {
+            assertTrue(objectStream.getStream().markSupported());
+            InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size);
+            try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
                 // get actual file schema
                 var metadata = reader.getFooter().getFileMetaData();
                 MessageType fileSchema = metadata.getSchema();
                 PageReadStore pages;
                 while ((pages = reader.readNextRowGroup()) != null) {
                     MessageColumnIO columnIO = new ColumnIOFactory()
                             .getColumnIO(schema, fileSchema);
                     RecordReader<Group> rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
                     for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) {
                         Group g = rr.read();
                         assertEquals(count, g.getInteger("int_field", 0));
                         count++;
                     }
                 }
-        }
-        return count;
+            }
+        }
+        return count;
🤖 Prompt for AI Agents
In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java` around lines
170 - 193, The method readRowsCountFromParquet leaks ObjectStream; call
storage.getSize(path) before opening the stream and wrap the ObjectStream from
storage.getObjectStream(path) in a try-with-resources so it is always closed;
create the ParquetUtils.StreamInputFile (and open ParquetFileReader) inside that
try block (or nested tries) so both the stream and the reader are closed
automatically; update references to ObjectStream, storage.getObjectStream,
storage.getSize, ParquetUtils.StreamInputFile, and ParquetFileReader in
readRowsCountFromParquet accordingly.

Comment on lines +365 to +379
Thread writerThread = new Thread(() -> {
try (ParquetWriter<Group> writer = ExampleParquetWriter
.builder(OutputFileWrapper)
.withConf(conf)
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
.withType(schema)
.build()) {

SimpleGroupFactory factory = new SimpleGroupFactory(schema);
for (int i = 0; i < rows; i++) {
writer.write(factory.newGroup().append("int_field", i));
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Writer thread failure can deadlock the reader.

If the Parquet writer throws, the pipe may never close and uploadLargeStream can block forever. Close the PipedOutputStream in a finally to guarantee EOF on failure.

🛠️ Proposed fix
         Thread writerThread = new Thread(() -> {
             try (ParquetWriter<Group> writer = ExampleParquetWriter
                     .builder(OutputFileWrapper)
                     .withConf(conf)
                     .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
                     .withType(schema)
                     .build()) {

                 SimpleGroupFactory factory = new SimpleGroupFactory(schema);
                 for (int i = 0; i < rows; i++) {
                     writer.write(factory.newGroup().append("int_field", i));
                 }
             } catch (IOException e) {
                 throw new RuntimeException(e);
+            } finally {
+                try {
+                    pipedOutputStream.close();
+                } catch (IOException ignored) {
+                }
             }
         });
🤖 Prompt for AI Agents
In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java` around lines
365 - 379, The writer thread can throw and never close the PipedOutputStream,
causing uploadLargeStream to block; update the writerThread block where
ParquetWriter<Group> is created (the lambda that uses
ExampleParquetWriter.builder(OutputFileWrapper) and writes with
SimpleGroupFactory) to ensure the underlying PipedOutputStream is closed in a
finally block (or try-with-resources around that stream) so EOF is always
signaled on exception — i.e., add a finally that closes the PipedOutputStream
referenced by OutputFileWrapper (or the variable wrapping the pipe) to guarantee
the pipe is closed even if ParquetWriter.write throws.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants