diff --git a/.fossa.yml b/.fossa.yml index 7422e7ce8718..e811ffda4648 100644 --- a/.fossa.yml +++ b/.fossa.yml @@ -157,6 +157,9 @@ targets: - type: gradle path: ./ target: ':instrumentation:hystrix-1.4:javaagent' + - type: gradle + path: ./ + target: ':instrumentation:iceberg-1.8:library' - type: gradle path: ./ target: ':instrumentation:influxdb-2.4:javaagent' diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index 657d719ed403..af6c20c0dfd9 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -32,6 +32,7 @@ These are the supported libraries and frameworks: | [Apache ElasticJob](https://shardingsphere.apache.org/elasticjob/) | 3.0+ | N/A | none | | [Apache HttpAsyncClient](https://hc.apache.org/index.html) | 4.1+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] | | [Apache HttpClient](https://hc.apache.org/index.html) | 2.0+ | [opentelemetry-apache-httpclient-4.3](../instrumentation/apache-httpclient/apache-httpclient-4.3/library),
[opentelemetry-apache-httpclient-5.2](../instrumentation/apache-httpclient/apache-httpclient-5.2/library) | [HTTP Client Spans], [HTTP Client Metrics] | +| [Apache Iceberg](https://iceberg.apache.org/) | N/A | [opentelemetry-iceberg-1.8](../instrumentation/iceberg-1.8/library/) | none | | [Apache ShenYu](https://shenyu.apache.org/) | 2.4+ | N/A | Provides `http.route` [2] | | [Apache Kafka Connect API](https://kafka.apache.org/documentation/#connect) | 2.6+ | N/A | [Messaging Spans] | | [Apache Kafka Producer/Consumer API](https://kafka.apache.org/documentation/#producerapi) | 0.11+ | [opentelemetry-kafka-clients-2.6](../instrumentation/kafka/kafka-clients/kafka-clients-2.6/library) | [Messaging Spans] | diff --git a/instrumentation/iceberg-1.8/README.md b/instrumentation/iceberg-1.8/README.md new file mode 100644 index 000000000000..e218632b3b83 --- /dev/null +++ b/instrumentation/iceberg-1.8/README.md @@ -0,0 +1,43 @@ +# Library Instrumentation for Apache Iceberg Version 1.8 and Higher + +Provides OpenTelemetry instrumentation for [Apache Iceberg](https://iceberg.apache.org/). + +## Quickstart + +### Add These Dependencies to Your Project + +Replace `OPENTELEMETRY_VERSION` with the [latest release](https://central.sonatype.com/artifact/io.opentelemetry.instrumentation/opentelemetry-iceberg-1.8). + +For Maven, add to your `pom.xml` dependencies: + +```xml + + + io.opentelemetry.instrumentation + opentelemetry-iceberg-1.8 + OPENTELEMETRY_VERSION + + +``` + +For Gradle, add to your dependencies: + +```groovy +implementation("io.opentelemetry.instrumentation:opentelemetry-iceberg-1.8:OPENTELEMETRY_VERSION") +``` + +### Usage + +The instrumentation library allows creating instrumented `Scan` (e.g., `TableScan`) instances for collecting and reporting OpenTelemetry-based scan metrics. For example: + +```java +OpenTelemetry openTelemetry = // ... +IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry); +TableScan tableScan = icebergTelemetry.wrapScan(table.newScan()); + +try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + // Process the scan tasks +} + +// The metrics will be reported after the scan tasks iterable is closed +``` diff --git a/instrumentation/iceberg-1.8/library/build.gradle.kts b/instrumentation/iceberg-1.8/library/build.gradle.kts new file mode 100644 index 000000000000..c1d3d3cb9c41 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/build.gradle.kts @@ -0,0 +1,14 @@ +plugins { + id("otel.library-instrumentation") + id("otel.nullaway-conventions") +} + +dependencies { + library("org.apache.iceberg:iceberg-core:1.8.1") + testImplementation(project(":instrumentation:iceberg-1.8:testing")) + implementation("io.opentelemetry:opentelemetry-api-incubator") +} + +otelJava { + minJavaVersionSupported.set(JavaVersion.VERSION_11) +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java new file mode 100644 index 000000000000..7ac909346d00 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergMetricsReporter.java @@ -0,0 +1,279 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.incubator.metrics.ExtendedLongCounterBuilder; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import java.util.List; +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.ScanMetricsResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; + +final class IcebergMetricsReporter implements MetricsReporter { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.iceberg-1.8"; + private static final AttributeKey SCHEMA_ID = AttributeKey.longKey("iceberg.schema.id"); + private static final AttributeKey TABLE_NAME = + AttributeKey.stringKey("iceberg.table.name"); + private static final AttributeKey SNAPSHOT_ID = AttributeKey.longKey("iceberg.snapshot.id"); + private static final AttributeKey SCAN_STATE = + AttributeKey.stringKey("iceberg.scan.state"); + private static final AttributeKey DELETE_TYPE = + AttributeKey.stringKey("iceberg.delete_file.type"); + private static final List> BASE_ADVICE = List.of(SCHEMA_ID, TABLE_NAME); + private static final List> ADVICE_FOR_DATA_AND_MANIFEST_FILE_COUNTS = + List.of(SCHEMA_ID, TABLE_NAME, SCAN_STATE); + private static final List> ADVICE_FOR_DELETE_FILE_COUNTS = + List.of(SCHEMA_ID, TABLE_NAME, SCAN_STATE, DELETE_TYPE); + + private final DoubleHistogram planningDuration; + private final LongCounter dataFilesCount; + private final LongCounter dataFilesSize; + private final LongCounter deleteFilesCount; + private final LongCounter deleteFilesSize; + private final LongCounter dataManifestsCount; + private final LongCounter deleteManifestsCount; + + IcebergMetricsReporter(OpenTelemetry openTelemetry) { + Meter meter = openTelemetry.getMeter(INSTRUMENTATION_NAME); + + planningDuration = + applyAdvice(BASE_ADVICE, ScanMetricsBuilderFactory.totalPlanningDuration(meter, "s")) + .build(); + dataFilesCount = + applyAdvice( + ADVICE_FOR_DATA_AND_MANIFEST_FILE_COUNTS, + ScanMetricsBuilderFactory.dataFilesCount(meter)) + .build(); + dataFilesSize = + applyAdvice(BASE_ADVICE, ScanMetricsBuilderFactory.dataFilesSize(meter)).build(); + deleteFilesCount = + applyAdvice( + ADVICE_FOR_DELETE_FILE_COUNTS, ScanMetricsBuilderFactory.deleteFilesCount(meter)) + .build(); + deleteFilesSize = + applyAdvice(BASE_ADVICE, ScanMetricsBuilderFactory.deleteFilesSize(meter)).build(); + dataManifestsCount = + applyAdvice( + ADVICE_FOR_DATA_AND_MANIFEST_FILE_COUNTS, + ScanMetricsBuilderFactory.dataManifestsCount(meter)) + .build(); + deleteManifestsCount = + applyAdvice( + ADVICE_FOR_DATA_AND_MANIFEST_FILE_COUNTS, + ScanMetricsBuilderFactory.deleteManifestsCount(meter)) + .build(); + } + + @Override + public void report(MetricsReport report) { + if (report instanceof ScanReport) { + reportScanMetrics((ScanReport) report); + } + } + + void reportScanMetrics(ScanReport scanReport) { + Attributes scanAttributes = + Attributes.of( + SCHEMA_ID, + (long) scanReport.schemaId(), + TABLE_NAME, + scanReport.tableName(), + SNAPSHOT_ID, + scanReport.snapshotId()); + ScanMetricsResult metrics = scanReport.scanMetrics(); + TimerResult duration = metrics.totalPlanningDuration(); + + if (duration != null) { + planningDuration.record(duration.totalDuration().toMillis() / 1000.0, scanAttributes); + } + + // Data files metrics + CounterResult current = metrics.resultDataFiles(); + + if (current != null) { + addValueToLongCounter(dataFilesCount, current.value(), scanAttributes, SCAN_STATE, "scanned"); + } + + current = metrics.skippedDataFiles(); + + if (current != null) { + addValueToLongCounter(dataFilesCount, current.value(), scanAttributes, SCAN_STATE, "skipped"); + } + + current = metrics.totalFileSizeInBytes(); + + if (current != null) { + dataFilesSize.add(current.value(), scanAttributes); + } + + // Delete files metrics + current = metrics.totalDeleteFileSizeInBytes(); + + if (current != null) { + deleteFilesSize.add(current.value(), scanAttributes); + } + + current = metrics.resultDeleteFiles(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "scanned", + DELETE_TYPE, + "all"); + } + + current = metrics.skippedDeleteFiles(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "skipped", + DELETE_TYPE, + "all"); + } + + current = metrics.indexedDeleteFiles(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "scanned", + DELETE_TYPE, + "indexed"); + } + + current = metrics.equalityDeleteFiles(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "scanned", + DELETE_TYPE, + "equality"); + } + + current = metrics.positionalDeleteFiles(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "scanned", + DELETE_TYPE, + "position"); + } + + current = metrics.dvs(); + + if (current != null) { + addValueToLongCounter( + deleteFilesCount, + current.value(), + scanAttributes, + SCAN_STATE, + "scanned", + DELETE_TYPE, + "dvs"); + } + + // Data manifests metrics + current = metrics.scannedDataManifests(); + + if (current != null) { + addValueToLongCounter( + dataManifestsCount, current.value(), scanAttributes, SCAN_STATE, "scanned"); + } + + current = metrics.skippedDataManifests(); + + if (current != null) { + addValueToLongCounter( + dataManifestsCount, current.value(), scanAttributes, SCAN_STATE, "skipped"); + } + + // Delete manifests metrics + current = metrics.scannedDeleteManifests(); + + if (current != null) { + addValueToLongCounter( + deleteManifestsCount, current.value(), scanAttributes, SCAN_STATE, "scanned"); + } + + current = metrics.skippedDeleteManifests(); + + if (current != null) { + addValueToLongCounter( + deleteManifestsCount, current.value(), scanAttributes, SCAN_STATE, "skipped"); + } + } + + private static void addValueToLongCounter( + LongCounter metric, + long measurement, + Attributes attributes, + AttributeKey att1Key, + String att1Value) { + Attributes newAttributes = attributes.toBuilder().put(att1Key, att1Value).build(); + metric.add(measurement, newAttributes); + } + + private static void addValueToLongCounter( + LongCounter metric, + long measurement, + Attributes attributes, + AttributeKey att1Key, + String att1Value, + AttributeKey att2Key, + String att2Value) { + Attributes newAttributes = + attributes.toBuilder().put(att1Key, att1Value).put(att2Key, att2Value).build(); + metric.add(measurement, newAttributes); + } + + private static LongCounterBuilder applyAdvice( + List> attributeKeys, LongCounterBuilder builder) { + if (builder instanceof ExtendedLongCounterBuilder) { + return ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(attributeKeys); + } + + return builder; + } + + private static DoubleHistogramBuilder applyAdvice( + List> attributeKeys, DoubleHistogramBuilder builder) { + if (builder instanceof ExtendedDoubleHistogramBuilder) { + return ((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(attributeKeys); + } + + return builder; + } +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java new file mode 100644 index 000000000000..81d0f20d0832 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTelemetry.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import org.apache.iceberg.Scan; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.ScanTaskGroup; + +/** Entrypoint for instrumenting Apache Iceberg scan metrics */ +public final class IcebergTelemetry { + private final OpenTelemetry openTelemetry; + + /** Returns a new {@link IcebergTelemetry} configured with the given {@link OpenTelemetry}. */ + public static IcebergTelemetry create(OpenTelemetry openTelemetry) { + return new IcebergTelemetry(openTelemetry); + } + + IcebergTelemetry(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + /** + * Creates a new {@link Scan} instance based on an existing {@link Scan} instance. The new + * instance is associated with a custom {@link org.apache.iceberg.metrics.MetricsReporter} that + * reports scan metrics using the configured {@link OpenTelemetry} instance. + * + * @param the child class, returned by method chaining, e.g., {@link + * Scan#project(org.apache.iceberg.Schema)} + * @param the type of tasks produced by this scan + * @param the type of task groups produced by this scan + * @param scan the original scan instance that will be instrumented + * @return an instrumented {@link Scan} instance based on the provided instance + */ + public > T1 wrapScan( + Scan scan) { + return scan.metricsReporter(new IcebergMetricsReporter(openTelemetry)); + } +} diff --git a/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilderFactory.java b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilderFactory.java new file mode 100644 index 000000000000..8870f2ea018b --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/ScanMetricsBuilderFactory.java @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; + +final class ScanMetricsBuilderFactory { + private static final String ROOT = "iceberg.scan"; + private static final String TOTAL_PLANNING_DURATION = ROOT + ".planning.duration"; + + private static final String DATA_FILES_COUNT = ROOT + ".data_files.count"; + private static final String DATA_FILES_SIZE = ROOT + ".data_files.size"; + private static final String DELETE_FILES_SIZE = ROOT + ".delete_files.size"; + private static final String DELETE_FILES_COUNT = ROOT + ".delete_files.count"; + private static final String DATA_MANIFESTS_COUNT = ROOT + ".data_manifests.count"; + private static final String DELETE_MANIFESTS_COUNT = ROOT + ".delete_manifests.count"; + + private ScanMetricsBuilderFactory() { + // prevents instantiation + } + + static DoubleHistogramBuilder totalPlanningDuration(Meter meter, String unit) { + return meter + .histogramBuilder(TOTAL_PLANNING_DURATION) + .setDescription("The total duration needed to plan the scan.") + .setUnit(unit); + } + + static LongCounterBuilder dataFilesCount(Meter meter) { + return meter + .counterBuilder(DATA_FILES_COUNT) + .setDescription("The number of data files.") + .setUnit("{file}"); + } + + static LongCounterBuilder deleteFilesCount(Meter meter) { + return meter + .counterBuilder(DELETE_FILES_COUNT) + .setDescription("The number of delete files.") + .setUnit("{file}"); + } + + static LongCounterBuilder dataManifestsCount(Meter meter) { + return meter + .counterBuilder(DATA_MANIFESTS_COUNT) + .setDescription("The number of data manifests.") + .setUnit("{file}"); + } + + static LongCounterBuilder deleteManifestsCount(Meter meter) { + return meter + .counterBuilder(DELETE_MANIFESTS_COUNT) + .setDescription("The number of delete manifests.") + .setUnit("{file}"); + } + + static LongCounterBuilder dataFilesSize(Meter meter) { + return meter + .counterBuilder(DATA_FILES_SIZE) + .setDescription("The total size of all scanned data files.") + .setUnit("By"); + } + + static LongCounterBuilder deleteFilesSize(Meter meter) { + return meter + .counterBuilder(DELETE_FILES_SIZE) + .setDescription("The total size of all scanned delete files.") + .setUnit("By"); + } +} diff --git a/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java b/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java new file mode 100644 index 000000000000..5e6abde53978 --- /dev/null +++ b/instrumentation/iceberg-1.8/library/src/test/java/io/opentelemetry/instrumentation/iceberg/v1_8/IcebergTest.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.apache.iceberg.TableScan; +import org.junit.jupiter.api.extension.RegisterExtension; + +class IcebergTest extends AbstractIcebergTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected TableScan configure(TableScan tableScan) { + OpenTelemetry openTelemetry = testing.getOpenTelemetry(); + IcebergTelemetry icebergTelemetry = IcebergTelemetry.create(openTelemetry); + return icebergTelemetry.wrapScan(tableScan); + } +} diff --git a/instrumentation/iceberg-1.8/metadata.yaml b/instrumentation/iceberg-1.8/metadata.yaml new file mode 100644 index 000000000000..3000e45310d8 --- /dev/null +++ b/instrumentation/iceberg-1.8/metadata.yaml @@ -0,0 +1,2 @@ +description: This standalone instrumentation enables metrics for Apache Iceberg scans. +library_link: https://iceberg.apache.org/ diff --git a/instrumentation/iceberg-1.8/testing/build.gradle.kts b/instrumentation/iceberg-1.8/testing/build.gradle.kts new file mode 100644 index 000000000000..e83b377b1492 --- /dev/null +++ b/instrumentation/iceberg-1.8/testing/build.gradle.kts @@ -0,0 +1,15 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + implementation("org.apache.iceberg:iceberg-core:1.8.1") + // The following dependency allows us to use the following Iceberg test classes TestTables and TestTable + // which are not published by default + implementation("org.apache.iceberg:iceberg-core:1.8.1") { + artifact { + classifier = "tests" + } + } + api(project(":testing-common")) +} diff --git a/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java b/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java new file mode 100644 index 000000000000..e8945ea19e0e --- /dev/null +++ b/instrumentation/iceberg-1.8/testing/src/main/java/io/opentelemetry/instrumentation/iceberg/v1_8/AbstractIcebergTest.java @@ -0,0 +1,397 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.iceberg.v1_8; + +import static io.opentelemetry.api.common.AttributeKey.longKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public abstract class AbstractIcebergTest { + protected static final int FORMAT_VERSION = 2; + protected static final Schema SCHEMA = + new Schema( + NestedField.required(3, "id", IntegerType.get()), + NestedField.required(4, "data", StringType.get())); + protected static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build(); + protected static final DataFile FILE_1 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10L) + .withPartitionPath("data_bucket=0") + .withRecordCount(1L) + .build(); + protected static final DataFile FILE_2 = + DataFiles.builder(SPEC) + .withPath("/path/to/data-b.parquet") + .withFileSizeInBytes(10L) + .withPartitionPath("data_bucket=1") + .withRecordCount(1L) + .withSplitOffsets(Arrays.asList(1L)) + .build(); + + @TempDir protected File tableDir = null; + protected Table table; + + protected abstract InstrumentationExtension testing(); + + protected abstract TableScan configure(TableScan tableScan); + + @BeforeEach + void init() { + this.table = TestTables.create(this.tableDir, "test", SCHEMA, SPEC, FORMAT_VERSION); + this.table.newFastAppend().appendFile(FILE_1).appendFile(FILE_2).commit(); + } + + @Test + void testCreateTelemetry() throws IOException { + + SimpleReporter reporter = new SimpleReporter(); + TableScan scan = + table + .newScan() + .filter(Expressions.lessThan("id", 5)) + .select("id", "data") + .metricsReporter(reporter); + scan = configure(scan); + + try (CloseableIterable tasks = scan.planFiles()) { + assertThat(tasks).isNotNull(); + assertThat(tasks.iterator()).isNotNull(); + } + + assertThat(reporter.report).isNotNull(); + assertThat(reporter.report).isInstanceOf(ScanReport.class); + ScanReport expected = (ScanReport) reporter.report; + + assertScanDurationMetric(expected); + assertDataFilesCountMetrics(expected); + assertDeleteFilesCountMetrics(expected); + assertDataManifestCountMetrics(expected); + assertDeleteManifestCountMetrics(expected); + assertSizeMetric( + "iceberg.scan.data_files.size", + expected, + expected.scanMetrics().totalFileSizeInBytes().value()); + assertSizeMetric( + "iceberg.scan.delete_files.size", + expected, + expected.scanMetrics().totalDeleteFileSizeInBytes().value()); + } + + private void assertScanDurationMetric(ScanReport expectedReport) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName("iceberg.scan.planning.duration") + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasCount(1) + .hasAttributesSatisfyingExactly( + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + private void assertDataFilesCountMetrics(ScanReport expectedReport) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName("iceberg.scan.data_files.count") + .hasUnit("{file}") + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue( + expectedReport.scanMetrics().resultDataFiles().value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport.scanMetrics().skippedDataFiles().value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "skipped"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + private void assertDataManifestCountMetrics(ScanReport expectedReport) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName("iceberg.scan.data_manifests.count") + .hasUnit("{file}") + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .scannedDataManifests() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .skippedDataManifests() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "skipped"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + private void assertDeleteManifestCountMetrics(ScanReport expectedReport) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName("iceberg.scan.delete_manifests.count") + .hasUnit("{file}") + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .scannedDeleteManifests() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .skippedDeleteManifests() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "skipped"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + private void assertDeleteFilesCountMetrics(ScanReport expectedReport) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName("iceberg.scan.delete_files.count") + .hasUnit("{file}") + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .resultDeleteFiles() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo(stringKey("iceberg.delete_file.type"), "all"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .skippedDeleteFiles() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "skipped"), + equalTo(stringKey("iceberg.delete_file.type"), "all"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .indexedDeleteFiles() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + stringKey("iceberg.delete_file.type"), "indexed"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .equalityDeleteFiles() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + stringKey("iceberg.delete_file.type"), "equality"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue( + expectedReport + .scanMetrics() + .positionalDeleteFiles() + .value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo( + stringKey("iceberg.delete_file.type"), "position"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName())), + longSumAssert -> + longSumAssert + .hasValue(expectedReport.scanMetrics().dvs().value()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("iceberg.scan.state"), "scanned"), + equalTo(stringKey("iceberg.delete_file.type"), "dvs"), + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + private void assertSizeMetric( + String otelMetricName, ScanReport expectedReport, long expectedValue) { + testing() + .waitAndAssertMetrics( + "io.opentelemetry.iceberg-1.8", + metricAssert -> + metricAssert + .hasName(otelMetricName) + .hasUnit("By") + .hasLongSumSatisfying( + sum -> + sum.hasPointsSatisfying( + longSumAssert -> + longSumAssert + .hasValue(expectedValue) + .hasAttributesSatisfyingExactly( + equalTo( + longKey("iceberg.schema.id"), + expectedReport.schemaId()), + equalTo( + stringKey("iceberg.table.name"), + expectedReport.tableName()))))); + } + + static final class SimpleReporter implements MetricsReporter { + MetricsReport report; + + @Override + public void report(MetricsReport report) { + this.report = report; + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c175394c2dd4..f85cf0d460ff 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -308,6 +308,8 @@ include(":instrumentation:hikaricp-3.0:library") include(":instrumentation:hikaricp-3.0:testing") include(":instrumentation:http-url-connection:javaagent") include(":instrumentation:hystrix-1.4:javaagent") +include(":instrumentation:iceberg-1.8:library") +include(":instrumentation:iceberg-1.8:testing") include(":instrumentation:influxdb-2.4:javaagent") include(":instrumentation:internal:internal-application-logger:bootstrap") include(":instrumentation:internal:internal-application-logger:javaagent")