-
Notifications
You must be signed in to change notification settings - Fork 1
Added test for read/writing Parquet files as streams to IBucket #144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis change adds Parquet support in tests and stream wrappers for Parquet I/O, extends S3Bucket with configurable buffering and robust multipart upload handling, and adds a Sequence Diagram(s)sequenceDiagram
participant Test as Test Suite
participant Parquet as ParquetUtils
participant Bucket as IBucket (MemoryBucket / S3Bucket)
participant S3Client as S3Client / S3AsyncClient
participant S3Svc as S3 Service
Test->>Parquet: generateParquetOutput() -> InputStream
Test->>Bucket: putObjectStream(path, InputStream)
alt MemoryBucket
Bucket-->>Bucket: store bytes in memory
else S3Bucket
Bucket->>S3Client: putObject or initiate multipart upload
S3Client->>S3Svc: upload parts / complete
end
Test->>Bucket: getObjectStream(path) or getSize(path)
Bucket-->>Test: Buffered InputStream or size
Test->>Parquet: read rows from InputStream
Parquet-->>Test: verified row count
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
80-89: 🛠️ Refactor suggestion
⚠️ Potential issueDo not close the shared
S3AsyncClientinside everyputObjectStreamcall
finally { s3AsyncClient.close(); }shuts the client down after a single upload, rendering theS3Bucketinstance unusable for any subsequent asynchronous operations (including multi-part uploads executed later in the same test run or in production code).
Move the shutdown logic to an explicitclose()method onS3Bucket, or let the application lifecycle manage it.@@ - } finally { - s3AsyncClient.close(); - } + }
🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
35-48: Validate and rename the instance-level buffer size
- A negative or zero
bufSizeargument silently produces an unusableBufferedInputStream; add validation.BUF_SIZEis not a constant but an instance field; Java conventions recommend lowerCamelCase for variables.- protected final int BUF_SIZE; // 8 KB by default + protected final int bufSize; // validated, per-instance @@ - this.BUF_SIZE = bufSize; + if (bufSize <= 0) { + throw new IllegalArgumentException("bufSize must be positive"); + } + this.bufSize = bufSize;Remember to update usages (
BUF_SIZE → bufSize) below.java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)
134-169: Use try-with-resources forObjectStreamto avoid leaks
objectStreamis opened but never closed if an assertion fails. Wrap it in a try-with-resources block:- ObjectStream objectStream = storage.getObjectStream(path); - assertTrue(objectStream.getStream().markSupported()); + try (ObjectStream objectStream = storage.getObjectStream(path)) { + assertTrue(objectStream.getStream().markSupported()); ... - } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
java/pom.xml(1 hunks)java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java(3 hunks)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java(4 hunks)java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java(1 hunks)java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java(1 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java(2 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java(1 hunks)
🔇 Additional comments (7)
java/pom.xml (1)
55-67: Dependencies for Parquet testing look good!The addition of Parquet and Hadoop dependencies with test scope is appropriate for the new streaming tests. Using recent versions (Parquet 1.15.1 and Hadoop 3.4.1) ensures compatibility with modern features while keeping them isolated to the test environment.
java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java (1)
44-47: Test for Parquet streaming looks good!The new test method follows the established pattern of the class, delegating to the shared tester implementation. This maintains consistency while adding the new Parquet streaming functionality.
java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java (1)
29-32: Test implementation matches established pattern!The added test follows the same pattern as in S3BucketTest, ensuring consistent testing of Parquet functionality across different bucket implementations.
java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java (2)
6-6: Import for @disabled annotation is appropriateThe import is correctly added to support the disabled test.
44-48: Well-documented disabled test for SDK V1Good job properly documenting why this test is disabled with a clear explanation about AWS SDK V1's S3ObjectInputStream limitations for Parquet reading. This approach preserves test consistency across implementations while acknowledging the technical constraint.
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
163-171: Re-mark the buffered stream with a generous read-limitDown-stream code (
SeekableInputStreamWrapper) relies onInputStream.reset()to seek backwards.
BufferedInputStreammarks are invalidated once more thanreadLimitbytes have been read; passing0(as done later) causesreset()to fail after the first read.Although the faulty code resides in
SeekableInputStreamWrapper, you can protect yourself here by pre-marking the stream with a large limit:BufferedInputStream bis = new BufferedInputStream(inputStream, bufSize); bis.mark(Integer.MAX_VALUE); return new ObjectStream(bis, name.toString());(See dedicated comment in
ParquetUtils.javafor the actual fix.)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)
305-342: Mark/reset support in the generated Parquet pipe may fail
SeekableInputStreamWrapperresets to the previously set mark (see separate comment) but the pipe created here is not wrapped in aBufferedInputStreamand no mark is set. When tests later callobjectStream.getStream().markSupported()the result istrue(because S3 layer wraps it), yet seeking backwards on the returned stream will fail outside the test environment.Consider wrapping
inputStreamToReturn:-PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); +PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); +inputStreamToReturn.mark(Integer.MAX_VALUE);
| public long getLength() throws IOException { | ||
| return stream.getStream().available(); | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
available() is not a reliable file length
InputStream.available() returns the number of immediately readable bytes, not the total length. For S3 streams this is often 0, which will break Parquet’s footer discovery.
Provide length via ObjectStream.getSize() (if available) or buffer the stream to measure once:
public long getLength() {
return stream.getSize(); // expose object size from bucket meta-data
}If no size is available, document the limitation clearly.
✅ Addressed in commit 3fcf702
| public static class SeekableInputStreamWrapper extends SeekableInputStream { | ||
| private final InputStream in; | ||
| private long pos = 0; | ||
| private final int markLimit = 0; | ||
|
|
||
| /** | ||
| * @param in the raw InputStream (e.g. S3ObjectInputStream) | ||
| */ | ||
| public SeekableInputStreamWrapper(InputStream in) { | ||
| this.in = in; | ||
| in.mark(markLimit); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
markLimit set to 0 breaks backward seek – increase and safeguard
SeekableInputStreamWrapper sets in.mark(markLimit) with markLimit = 0; after a single byte is read, reset() throws IOException ("Mark invalid"), which will surface when Parquet seeks backwards.
Fix:
- private final int markLimit = 0;
+ private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind
@@
- public SeekableInputStreamWrapper(InputStream in) {
- this.in = in;
- in.mark(markLimit);
+ public SeekableInputStreamWrapper(InputStream in) {
+ // Ensure mark/reset capability
+ this.in = in.markSupported() ? in : new BufferedInputStream(in);
+ this.in.mark(MARK_LIMIT);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public static class SeekableInputStreamWrapper extends SeekableInputStream { | |
| private final InputStream in; | |
| private long pos = 0; | |
| private final int markLimit = 0; | |
| /** | |
| * @param in the raw InputStream (e.g. S3ObjectInputStream) | |
| */ | |
| public SeekableInputStreamWrapper(InputStream in) { | |
| this.in = in; | |
| in.mark(markLimit); | |
| public static class SeekableInputStreamWrapper extends SeekableInputStream { | |
| private final InputStream in; | |
| private long pos = 0; | |
| private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind | |
| /** | |
| * @param in the raw InputStream (e.g. S3ObjectInputStream) | |
| */ | |
| public SeekableInputStreamWrapper(InputStream in) { | |
| // Ensure mark/reset capability | |
| this.in = in.markSupported() ? in : new BufferedInputStream(in); | |
| this.in.mark(MARK_LIMIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asuiu Awesome! 🚀 That's going to make the Parquet implementation way more robust. Sometimes the simplest fixes prevent the biggest headaches later. Seeking backward properly is critical for Parquet reading operations.
(_/)
(•ᴗ•)
/ >🔍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java (1)
63-67: Implementation looks good but consider potential inefficiency.The implementation correctly returns the object's size by leveraging the existing
getObjectmethod. This approach is simple but loads the entire object into memory to determine its size, which could be inefficient for very large objects. Since this is a memory-based implementation primarily for testing (as noted in the class documentation), this is acceptable, but it's worth documenting this limitation.@Override public long getSize(PurePosixPath name) throws IOException { + // Note: This loads the entire object into memory to determine size + // which is acceptable for test scenarios but inefficient for large objects byte[] content = getObject(name); return content.length; }java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)
86-98: Well-implemented S3 size retrieval with proper exception handling.This implementation is efficient as it retrieves only the metadata rather than downloading the entire object content. The exception handling is thorough, properly differentiating between a missing object (404) and other potential errors.
Consider adding braces around the single-line if statement at line 92-93 for consistency with the codebase style and to prevent potential future bugs:
catch (AmazonS3Exception e) { if (e.getStatusCode() == 404) + { throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + } throw new IOException("Failed to get object metadata: " + e.getMessage(), e); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
java/src/main/java/com/esamtrade/bucketbase/IBucket.java(1 hunks)java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java(1 hunks)java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java(4 hunks)java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java(2 hunks)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java(4 hunks)java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java(1 hunks)java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java(1 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java(2 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java
- java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java
- java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java
- java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java
- java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
- java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java
🔇 Additional comments (2)
java/src/main/java/com/esamtrade/bucketbase/IBucket.java (1)
30-30: Interface extension looks good.The new method
getSizeis a solid addition to theIBucketinterface and provides a standardized way to retrieve object sizes. The method signature appropriately follows the same pattern as other interface methods withIOExceptionhandling.java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)
3-3: LGTM: Required import for SDK exception handling.The added import for
SdkClientExceptionis necessary for the appropriate exception handling in the newgetSizemethod.
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
crudeanu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some minor changes suggested here.
| CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); | ||
| return CompletedPart.builder() | ||
| .partNumber(partNumber) | ||
| .eTag(uploadPartResponse.join().eTag()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, I've missed this (gpt code).
Probably it's not a good idea to perform the upload sequentially, but to paralelize the upload.
Would you be so kind to suggest a better implementation?
| }); | ||
| writerThread.start(); | ||
| try { | ||
| Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I remember, unfortunately - yes, without this sleep the tests were flaky failing as the thread was not able to start fast enough... probably there's a much better solution with a countdownlock or other sync primitive. But I might be wrong, and maybe it's not necessary anymore after some refactor I made... I guess you can try to remove it , and if it all works for few times - probably it's not needed anymore
|
|
||
| // wrap pipe in our PositionOutputStream | ||
| ParquetUtils.PositionOutputStreamWrapper posOutStream = new ParquetUtils.PositionOutputStreamWrapper(pipedOutputStream); | ||
| ParquetUtils.OutputFileWrapper OutputFileWrapper = new ParquetUtils.OutputFileWrapper(posOutStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider renaming this to camelcase: outputFileWrapper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to make any renamings or refactoring you consider needed or useful
| <!-- Parquet dependencies --> | ||
| <dependency> | ||
| <groupId>org.apache.parquet</groupId> | ||
| <artifactId>parquet-hadoop-bundle</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't parquet-hadoop enough? Do we need the whole bundle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea... maybe :)
Feel free to fix any inefficiencies. This PR was a hackish PoC written by GPT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java`:
- Around line 221-234: Update S3Bucket.getSize to treat AwsServiceException with
HTTP 404 the same as NoSuchKey: catch AwsServiceException separately (in
addition to NoSuchKeyException) and check e.statusCode() == 404 (or
e.statusCode().toString()/int equivalent depending on type) and throw
FileNotFoundException in that case; otherwise keep the existing catch for
AwsServiceException | SdkClientException to wrap other failures in an
IOException. Ensure you reference getSize, S3Bucket, HeadObjectRequest/Response
and preserve the original error as cause when throwing IOException.
In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java`:
- Around line 365-379: The writer thread can throw and never close the
PipedOutputStream, causing uploadLargeStream to block; update the writerThread
block where ParquetWriter<Group> is created (the lambda that uses
ExampleParquetWriter.builder(OutputFileWrapper) and writes with
SimpleGroupFactory) to ensure the underlying PipedOutputStream is closed in a
finally block (or try-with-resources around that stream) so EOF is always
signaled on exception — i.e., add a finally that closes the PipedOutputStream
referenced by OutputFileWrapper (or the variable wrapping the pipe) to guarantee
the pipe is closed even if ParquetWriter.write throws.
- Around line 170-193: The method readRowsCountFromParquet leaks ObjectStream;
call storage.getSize(path) before opening the stream and wrap the ObjectStream
from storage.getObjectStream(path) in a try-with-resources so it is always
closed; create the ParquetUtils.StreamInputFile (and open ParquetFileReader)
inside that try block (or nested tries) so both the stream and the reader are
closed automatically; update references to ObjectStream,
storage.getObjectStream, storage.getSize, ParquetUtils.StreamInputFile, and
ParquetFileReader in readRowsCountFromParquet accordingly.
| @Override | ||
| public long getSize(PurePosixPath name) throws IOException { | ||
| try { | ||
| HeadObjectRequest request = HeadObjectRequest.builder() | ||
| .bucket(bucketName) | ||
| .key(name.toString()) | ||
| .build(); | ||
| HeadObjectResponse response = s3Client.headObject(request); | ||
| return response.contentLength(); | ||
| } catch (NoSuchKeyException e) { | ||
| throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); | ||
| } catch (AwsServiceException | SdkClientException e) { | ||
| throw new IOException("Failed to get object size: " + name, e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
AWS SDK for Java v2 S3Client.headObject NoSuchKeyException missing object
💡 Result:
Short answer
- S3Client.headObject throws NoSuchKeyException (an S3 service exception) when the key does not exist. If you lack s3:ListBucket permission S3 may instead return 403 (access denied). Use try/catch for NoSuchKeyException or inspect the S3Exception HTTP status/error code. [1][2]
Example (Java SDK v2)
- Preferred: call headObject and handle missing-key explicitly:
try {
HeadObjectResponse r = s3.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build());
// object exists
} catch (NoSuchKeyException e) {
// object missing
} catch (S3Exception e) {
// other service errors (e.g. 403 if no ListBucket permission) — check e.statusCode() / e.awsErrorDetails()
} catch (SdkClientException e) {
// client-side errors (network/credentials)
}
Notes / alternatives
- If you get 403 for a missing key, ensure the caller has s3:ListBucket on the bucket (or use a different approach). [2]
- Alternatives: listObjectsV2 with prefix (slower), GetObjectAttributes (if available in your SDK/API region) or attempt a GetObject and handle 404. [5][8]
Sources
- AWS Java SDK v2 S3Client.headObject docs (throws NoSuchKeyException; behavior depends on s3:ListBucket permission). [1][2]
- AWS developer guide / examples (head/list guidance). [8]
- Community/guide pages on HEAD returning 404 when object missing and 403 if no list permission. [5]
References
[1] S3Client API doc (headObject throws NoSuchKeyException).
[2] AWS S3 headObject behavior / permission details.
[5] Checking if an S3 object exists (example using head-object).
[8] AWS SDK for Java developer guide (S3 examples).
If you want, I can paste a ready-to-run Java method that wraps this and returns a boolean.
🌐 Web query:
MinIO S3 headObject 404 exception handling missing key
💡 Result:
Short answer
- HeadObject/StatObject will return a 404/NotFound (or an SDK error variant that maps to NotFound/NoSuchKey) when the object key does not exist; some SDKs expose this as an HTTP 404 status, others as an error type/code (e.g. NotFound / NoSuchKey). Handle it by checking the HTTP status or the SDK error code and treating that case as “object missing.” (docs.aws.amazon.com)
Practical guidance
- Catch the SDK error and inspect:
- the HTTP status (== 404 / http.StatusNotFound), or
- the service error code (e.g. "NoSuchKey" / NotFound / HeadObjectError::NotFound) — SDKs differ in representation. If you rely on string matching, prefer checking structured fields (error.Code / error.Type) when available. (docs.aws.amazon.com)
- MinIO specifics:
- MinIO commonly returns NoSuchKey / NotFound for missing objects; some client libraries historically required checking the error message text (fragile), and there are open issues about exposing stable error constants. Prefer checking HTTP status or SDK-provided enum/fields. (github.com)
- Permissions caveat: AWS S3 returns 404 only if you have s3:ListBucket; without it you may get 403 (so treat 403 vs 404 differently if you expect “missing” vs “forbidden”). MinIO deployments behind proxies (Cloudflare, etc.) can also change returned status codes. (docs.aws.amazon.com)
- Eventually-consistent/latency: if objects are created just before you call HeadObject, consider retries with exponential backoff rather than failing immediately. (stackoverflow.com)
Example patterns (pseudocode)
- Generic:
- try headObject()
- if err == nil → object exists
- else if err.httpStatus == 404 or err.code == "NoSuchKey" or err.isNotFound() → treat as missing
- else → treat as transient or fatal error (retry or surface)
If you tell me which language/SDK you use (Go, Java, Python, .NET, JS, Rust), I’ll give a concrete code example.
Citations:
- 1: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html?utm_source=openai
- 2: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html?utm_source=openai
- 3: How to check if object exists minio/minio-go#1082
- 4: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html?utm_source=openai
- 5: https://stackoverflow.com/questions/61473529/aws-lambdaan-error-occurred-404-when-calling-the-headobject-operation-not-fo?utm_source=openai
🌐 Web query:
AWS SDK Java v2 AwsServiceException statusCode method
💡 Result:
Yes — in AWS SDK for Java v2, AwsServiceException exposes the HTTP status code via SdkServiceException APIs:
- Instance accessor: int statusCode() — returns the HTTP status code on the exception (defined on SdkServiceException and available on AwsServiceException). [1]
- Builder setter: AwsServiceException.Builder statusCode(int statusCode) — used when building an AwsServiceException. [2]
References:
[1] AwsServiceException / SdkServiceException Javadoc (statusCode accessor).
[2] AwsServiceException.Builder Javadoc (statusCode(int) builder method).
Handle the edge case where 404 comes via AwsServiceException—good defensive programming, but don't overstate the impact.
headObject primarily throws NoSuchKeyException for missing objects (verified across AWS SDK v2 and MinIO). The edge case where you get AwsServiceException with statusCode == 404 instead is real but rare—defensive coding to handle it makes sense, but this isn't a blocker. The proposed fix is correct: split the exception handlers and check statusCode() == 404 on AwsServiceException. Current code handles the common path; this handles the outlier. Worth doing, but not critical for most deployments.
🔧 Proposed fix
- } catch (NoSuchKeyException e) {
- throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
- } catch (AwsServiceException | SdkClientException e) {
- throw new IOException("Failed to get object size: " + name, e);
- }
+ } catch (NoSuchKeyException e) {
+ throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+ } catch (AwsServiceException e) {
+ if (e.statusCode() == 404) {
+ throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
+ }
+ throw new IOException("Failed to get object size: " + name, e);
+ } catch (SdkClientException e) {
+ throw new IOException("Failed to get object size: " + name, e);
+ }🤖 Prompt for AI Agents
In `@java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java` around lines 221 -
234, Update S3Bucket.getSize to treat AwsServiceException with HTTP 404 the same
as NoSuchKey: catch AwsServiceException separately (in addition to
NoSuchKeyException) and check e.statusCode() == 404 (or
e.statusCode().toString()/int equivalent depending on type) and throw
FileNotFoundException in that case; otherwise keep the existing catch for
AwsServiceException | SdkClientException to wrap other failures in an
IOException. Ensure you reference getSize, S3Bucket, HeadObjectRequest/Response
and preserve the original error as cause when throwing IOException.
| private int readRowsCountFromParquet(PurePosixPath path, MessageType schema) throws IOException { | ||
| ObjectStream objectStream = storage.getObjectStream(path); | ||
| assertTrue(objectStream.getStream().markSupported()); | ||
| long size = storage.getSize(path); | ||
|
|
||
| InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size); | ||
| int count = 0; | ||
| try (ParquetFileReader reader = ParquetFileReader.open(inFile)) { | ||
| // get actual file schema | ||
| var metadata = reader.getFooter().getFileMetaData(); | ||
| MessageType fileSchema = metadata.getSchema(); | ||
| PageReadStore pages; | ||
| while ((pages = reader.readNextRowGroup()) != null) { | ||
| MessageColumnIO columnIO = new ColumnIOFactory() | ||
| .getColumnIO(schema, fileSchema); | ||
| RecordReader<Group> rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); | ||
| for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) { | ||
| Group g = rr.read(); | ||
| assertEquals(count, g.getInteger("int_field", 0)); | ||
| count++; | ||
| } | ||
| } | ||
| } | ||
| return count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close the ObjectStream—open connections shouldn’t leak into orbit.
readRowsCountFromParquet never closes the ObjectStream. For S3-backed streams that can leave HTTP connections open and make tests flaky under load. Wrap it in try-with-resources (and ideally call getSize before opening the stream so you don’t leak on getSize failures).
🚀 Proposed fix
- ObjectStream objectStream = storage.getObjectStream(path);
- assertTrue(objectStream.getStream().markSupported());
- long size = storage.getSize(path);
-
- InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size);
- int count = 0;
- try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
+ long size = storage.getSize(path);
+ int count = 0;
+ try (ObjectStream objectStream = storage.getObjectStream(path)) {
+ assertTrue(objectStream.getStream().markSupported());
+ InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size);
+ try (ParquetFileReader reader = ParquetFileReader.open(inFile)) {
// get actual file schema
var metadata = reader.getFooter().getFileMetaData();
MessageType fileSchema = metadata.getSchema();
PageReadStore pages;
while ((pages = reader.readNextRowGroup()) != null) {
MessageColumnIO columnIO = new ColumnIOFactory()
.getColumnIO(schema, fileSchema);
RecordReader<Group> rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) {
Group g = rr.read();
assertEquals(count, g.getInteger("int_field", 0));
count++;
}
}
- }
- return count;
+ }
+ }
+ return count;🤖 Prompt for AI Agents
In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java` around lines
170 - 193, The method readRowsCountFromParquet leaks ObjectStream; call
storage.getSize(path) before opening the stream and wrap the ObjectStream from
storage.getObjectStream(path) in a try-with-resources so it is always closed;
create the ParquetUtils.StreamInputFile (and open ParquetFileReader) inside that
try block (or nested tries) so both the stream and the reader are closed
automatically; update references to ObjectStream, storage.getObjectStream,
storage.getSize, ParquetUtils.StreamInputFile, and ParquetFileReader in
readRowsCountFromParquet accordingly.
| Thread writerThread = new Thread(() -> { | ||
| try (ParquetWriter<Group> writer = ExampleParquetWriter | ||
| .builder(OutputFileWrapper) | ||
| .withConf(conf) | ||
| .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) | ||
| .withType(schema) | ||
| .build()) { | ||
|
|
||
| SimpleGroupFactory factory = new SimpleGroupFactory(schema); | ||
| for (int i = 0; i < rows; i++) { | ||
| writer.write(factory.newGroup().append("int_field", i)); | ||
| } | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writer thread failure can deadlock the reader.
If the Parquet writer throws, the pipe may never close and uploadLargeStream can block forever. Close the PipedOutputStream in a finally to guarantee EOF on failure.
🛠️ Proposed fix
Thread writerThread = new Thread(() -> {
try (ParquetWriter<Group> writer = ExampleParquetWriter
.builder(OutputFileWrapper)
.withConf(conf)
.withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE)
.withType(schema)
.build()) {
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
for (int i = 0; i < rows; i++) {
writer.write(factory.newGroup().append("int_field", i));
}
} catch (IOException e) {
throw new RuntimeException(e);
+ } finally {
+ try {
+ pipedOutputStream.close();
+ } catch (IOException ignored) {
+ }
}
});🤖 Prompt for AI Agents
In `@java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java` around lines
365 - 379, The writer thread can throw and never close the PipedOutputStream,
causing uploadLargeStream to block; update the writerThread block where
ParquetWriter<Group> is created (the lambda that uses
ExampleParquetWriter.builder(OutputFileWrapper) and writes with
SimpleGroupFactory) to ensure the underlying PipedOutputStream is closed in a
finally block (or try-with-resources around that stream) so EOF is always
signaled on exception — i.e., add a finally that closes the PipedOutputStream
referenced by OutputFileWrapper (or the variable wrapping the pipe) to guarantee
the pipe is closed even if ParquetWriter.write throws.
This doesn't change the library itself (excepting a minor add of buffering support in S3 bucket),
but it just adds a functional test that writes and reads in a stream manner a Parquet file to buckets.