Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.azure.AzureProperties.ADLS_SAS_TOKEN_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
Expand All @@ -31,6 +32,7 @@

import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.Response;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClientBuilder;
Expand All @@ -42,6 +44,7 @@
import java.util.Iterator;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileInfo;
import org.apache.iceberg.io.InputFile;
Expand Down Expand Up @@ -78,6 +81,23 @@ public void testFileOperations() throws IOException {
assertThat(fileClient.exists()).isFalse();
}

@Test
public void readMissingLocation() {
String path = "path/to/file";
String location = AZURITE_CONTAINER.location(path);
ADLSFileIO io = createFileIO();
DataLakeFileClient fileClient = AZURITE_CONTAINER.fileClient(path);
assertThat(fileClient.exists()).isFalse();

InputFile inputFile = io.newInputFile(location);

assertThatThrownBy(inputFile::newStream)
.isInstanceOf(NotFoundException.class)
.hasCauseInstanceOf(BlobStorageException.class)
.hasMessage(
"Location does not exist: abfs://container@account.dfs.core.windows.net/path/to/file");
}

@Test
public void testBulkDeleteFiles() {
String path1 = "path/to/file1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class TestADLSInputStream extends AzuriteTestBase {
private final Random random = new Random(1);
private final AzureProperties azureProperties = new AzureProperties();

private String location() {
return AZURITE_CONTAINER.location(FILE_PATH);
}

private DataLakeFileClient fileClient() {
return AZURITE_CONTAINER.fileClient(FILE_PATH);
}
Expand All @@ -55,7 +59,8 @@ public void testRead() throws Exception {
setupData(data);

try (SeekableInputStream in =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
int readSize = 1024;

readAndCheck(in, in.getPos(), readSize, data, false);
Expand Down Expand Up @@ -90,7 +95,8 @@ public void testReadSingle() throws Exception {
setupData(data);

try (SeekableInputStream in =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
}
Expand Down Expand Up @@ -131,7 +137,8 @@ public void testRangeRead() throws Exception {
setupData(expected);

try (RangeReadable in =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
// first 1k
position = 0;
offset = 0;
Expand Down Expand Up @@ -164,7 +171,8 @@ private void readAndCheckRanges(
public void testClose() throws Exception {
setupData(randomData(2));
SeekableInputStream closed =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics());
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics());
closed.close();
assertThatThrownBy(() -> closed.seek(0))
.isInstanceOf(IllegalStateException.class)
Expand All @@ -178,7 +186,8 @@ public void testSeek() throws Exception {
setupData(data);

try (SeekableInputStream in =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics())) {
in.seek(data.length / 2);
byte[] actual = new byte[data.length / 2];

Expand All @@ -193,7 +202,8 @@ public void testSeek() throws Exception {
public void testSeekNegative() throws Exception {
setupData(randomData(2));
SeekableInputStream in =
new ADLSInputStream(fileClient(), null, azureProperties, MetricsContext.nullMetrics());
new ADLSInputStream(
location(), fileClient(), null, azureProperties, MetricsContext.nullMetrics());
assertThatThrownBy(() -> in.seek(-3))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Cannot seek: position -3 is negative");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ public long getLength() {

@Override
public SeekableInputStream newStream() {
return new ADLSInputStream(fileClient(), fileSize, azureProperties(), metrics());
return new ADLSInputStream(location(), fileClient(), fileSize, azureProperties(), metrics());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
*/
package org.apache.iceberg.azure.adlsv2;

import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.FileRange;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
Expand All @@ -46,6 +50,7 @@ class ADLSInputStream extends SeekableInputStream implements RangeReadable {
private static final int SKIP_SIZE = 1024 * 1024;

private final StackTraceElement[] createStack;
private final String location;
private final DataLakeFileClient fileClient;
private Long fileSize;
private final AzureProperties azureProperties;
Expand All @@ -59,10 +64,12 @@ class ADLSInputStream extends SeekableInputStream implements RangeReadable {
private final Counter readOperations;

ADLSInputStream(
String location,
DataLakeFileClient fileClient,
Long fileSize,
AzureProperties azureProperties,
MetricsContext metrics) {
this.location = location;
this.fileClient = fileClient;
this.fileSize = fileSize;
this.azureProperties = azureProperties;
Expand Down Expand Up @@ -184,6 +191,7 @@ private DataLakeFileOpenInputStreamResult openRange(FileRange range) {
try {
return fileClient.openInputStream(getInputOptions(range));
} catch (RuntimeException e) {
throwNotFoundIfNotPresent(e, location);
LOG.error(
"Failed to open input stream for file {}, range {}", fileClient.getFilePath(), range, e);
throw e;
Expand All @@ -209,4 +217,20 @@ protected void finalize() throws Throwable {
LOG.warn("Unclosed input stream created by:\n\t{}", trace);
}
}

private static void throwNotFoundIfNotPresent(Throwable throwable, String location) {
if (isFileNotFoundException(throwable)) {
throw new NotFoundException(throwable, "Location does not exist: %s", location);
}
}

private static boolean isFileNotFoundException(Throwable exception) {
if (exception instanceof BlobStorageException blobStorageException) {
return BlobErrorCode.BLOB_NOT_FOUND.equals(blobStorageException.getErrorCode());
}
if (exception instanceof DataLakeStorageException dataLakeStorageException) {
return "PathNotFound".equals(dataLakeStorageException.getErrorCode());
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ void before() {
InternalDataLakeFileOpenInputStreamResult openInputStreamResult =
new InternalDataLakeFileOpenInputStreamResult(inputStream, mock());
when(fileClient.openInputStream(any())).thenReturn(openInputStreamResult);
adlsInputStream = new ADLSInputStream(fileClient, 0L, mock(), mock());
adlsInputStream =
new ADLSInputStream(
"abfs://container@account.dfs.core.windows.net/path/to/file",
fileClient,
0L,
mock(),
mock());
}

@Test
Expand Down
Loading