-
Notifications
You must be signed in to change notification settings - Fork 219
adding dataproc labels to bigquery jobs #1412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tnazarew
wants to merge
6
commits into
GoogleCloudDataproc:master
Choose a base branch
from
tnazarew:add_dataproc_labes
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
3d259ba
adding dataproc labels to bigquery jobs
tnazarew 41065e9
PR suggestions
tnazarew 1da408b
PR suggestions
tnazarew f85d82d
removed httpclient dependency
tnazarew 5aefb72
add cache and move http client to non static context
tnazarew 2e1bd9f
refactor in the test class
tnazarew File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
234 changes: 234 additions & 0 deletions
234
...ry-connector-common/src/main/java/com/google/cloud/spark/bigquery/util/GCPLabelUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| package com.google.cloud.spark.bigquery.util; | ||
|
|
||
| import static java.nio.charset.StandardCharsets.UTF_8; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.collect.ImmutableMap; | ||
| import java.io.IOException; | ||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import org.apache.hc.client5.http.classic.methods.HttpGet; | ||
| import org.apache.hc.client5.http.config.ConnectionConfig; | ||
| import org.apache.hc.client5.http.config.RequestConfig; | ||
| import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; | ||
| import org.apache.hc.client5.http.impl.classic.HttpClients; | ||
| import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; | ||
| import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder; | ||
| import org.apache.hc.core5.http.ClassicHttpResponse; | ||
| import org.apache.hc.core5.http.ParseException; | ||
| import org.apache.hc.core5.http.io.entity.EntityUtils; | ||
| import org.apache.hc.core5.util.Timeout; | ||
|
|
||
| /** Util to extract values from GCP environment */ | ||
| public class GCPLabelUtils { | ||
|
|
||
| private static final String BASE_URI = "http://metadata.google.internal/computeMetadata/v1"; | ||
| public static final String PROJECT_ID_ENDPOINT = "/project/project-id"; | ||
| public static final String BATCH_ID_ENDPOINT = "/instance/attributes/dataproc-batch-id"; | ||
| public static final String BATCH_UUID_ENDPOINT = "/instance/attributes/dataproc-batch-uuid"; | ||
| public static final String SESSION_ID_ENDPOINT = "/instance/attributes/dataproc-session-id"; | ||
| public static final String SESSION_UUID_ENDPOINT = "/instance/attributes/dataproc-session-uuid"; | ||
| public static final String CLUSTER_UUID_ENDPOINT = "/instance/attributes/dataproc-cluster-uuid"; | ||
| public static final String DATAPROC_REGION_ENDPOINT = "/instance/attributes/dataproc-region"; | ||
| private static final String DATAPROC_CLASSPATH = "/usr/local/share/google/dataproc/lib"; | ||
| private static final CloseableHttpClient HTTP_CLIENT; | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| public static final String SPARK_YARN_TAGS = "spark.yarn.tags"; | ||
| public static final String SPARK_DRIVER_HOST = "spark.driver.host"; | ||
| public static final String SPARK_APP_ID = "spark.app.id"; | ||
| public static final String SPARK_APP_NAME = "spark.app.name"; | ||
| public static final String GOOGLE_METADATA_API = "google.metadata.api.base-url"; | ||
| public static final String SPARK_MASTER = "spark.master"; | ||
| private static final String JOB_ID_PREFIX = "dataproc_job_"; | ||
| private static final String JOB_UUID_PREFIX = "dataproc_uuid_"; | ||
| private static final String METADATA_FLAVOUR = "Metadata-Flavor"; | ||
| private static final String GOOGLE = "Google"; | ||
| private static final String SPARK_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"; | ||
|
|
||
| enum ResourceType { | ||
| CLUSTER, | ||
| BATCH, | ||
| INTERACTIVE, | ||
| UNKNOWN | ||
| } | ||
|
|
||
| static { | ||
| ConnectionConfig connectionConfig = | ||
| ConnectionConfig.custom() | ||
| .setConnectTimeout(Timeout.ofMilliseconds(100)) | ||
| .setSocketTimeout(Timeout.ofMilliseconds(100)) | ||
| .build(); | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| PoolingHttpClientConnectionManager connMan = | ||
| PoolingHttpClientConnectionManagerBuilder.create() | ||
| .setDefaultConnectionConfig(connectionConfig) | ||
| .build(); | ||
| RequestConfig config = | ||
| RequestConfig.custom().setConnectionRequestTimeout(Timeout.ofMilliseconds(100)).build(); | ||
| HTTP_CLIENT = | ||
| HttpClients.custom().setDefaultRequestConfig(config).setConnectionManager(connMan).build(); | ||
| } | ||
|
|
||
| static boolean isDataprocRuntime() { | ||
| String sparkDistClasspath = System.getenv(SPARK_DIST_CLASSPATH); | ||
| return (sparkDistClasspath != null && sparkDistClasspath.contains(DATAPROC_CLASSPATH)); | ||
| } | ||
|
|
||
| public static Map<String, String> getSparkLabels(ImmutableMap<String, String> conf) { | ||
| Map<String, String> sparkLabels = new HashMap<>(); | ||
| getSparkAppId(conf).ifPresent(p -> sparkLabels.put("appId", p)); | ||
| getSparkAppName(conf).ifPresent(p -> sparkLabels.put("appName", p)); | ||
| if (isDataprocRuntime()) { | ||
| sparkLabels.putAll(getGCPLabels(conf)); | ||
| } | ||
| return sparkLabels; | ||
| } | ||
|
|
||
| static Map<String, String> getGCPLabels(ImmutableMap<String, String> conf) { | ||
| Map<String, String> gcpLabels = new HashMap<>(); | ||
| ResourceType resource = identifyResource(conf); | ||
| switch (resource) { | ||
| case CLUSTER: | ||
| getClusterName(conf).ifPresent(p -> gcpLabels.put("cluster.name", p)); | ||
| getClusterUUID(conf).ifPresent(p -> gcpLabels.put("cluster.uuid", p)); | ||
| getDataprocJobID(conf).ifPresent(p -> gcpLabels.put("job.id", p)); | ||
| getDataprocJobUUID(conf).ifPresent(p -> gcpLabels.put("job.uuid", p)); | ||
| gcpLabels.put("job.type", "dataproc_job"); | ||
| break; | ||
| case BATCH: | ||
| getDataprocBatchID(conf).ifPresent(p -> gcpLabels.put("spark.batch.id", p)); | ||
| getDataprocBatchUUID(conf).ifPresent(p -> gcpLabels.put("spark.batch.uuid", p)); | ||
| gcpLabels.put("job.type", "batch"); | ||
| break; | ||
| case INTERACTIVE: | ||
| getDataprocSessionID(conf).ifPresent(p -> gcpLabels.put("spark.session.id", p)); | ||
| getDataprocSessionUUID(conf).ifPresent(p -> gcpLabels.put("spark.session.uuid", p)); | ||
| gcpLabels.put("job.type", "session"); | ||
| break; | ||
| case UNKNOWN: | ||
| // do nothing | ||
| break; | ||
| } | ||
| getGCPProjectId(conf).ifPresent(p -> gcpLabels.put("projectId", p)); | ||
| getDataprocRegion(conf).ifPresent(p -> gcpLabels.put("region", p)); | ||
| return gcpLabels; | ||
| } | ||
|
|
||
| static ResourceType identifyResource(ImmutableMap<String, String> conf) { | ||
| if ("yarn".equals(conf.getOrDefault(SPARK_MASTER, ""))) return ResourceType.CLUSTER; | ||
| if (getDataprocBatchID(conf).isPresent()) return ResourceType.BATCH; | ||
| if (getDataprocSessionID(conf).isPresent()) return ResourceType.INTERACTIVE; | ||
|
|
||
| return ResourceType.UNKNOWN; | ||
| } | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private static Optional<String> getDriverHost(ImmutableMap<String, String> conf) { | ||
| return Optional.ofNullable(conf.get(SPARK_DRIVER_HOST)); | ||
| } | ||
|
|
||
| /* sample hostname: | ||
| * sample-cluster-m.us-central1-a.c.hadoop-cloud-dev.google.com.internal */ | ||
| @VisibleForTesting | ||
| static Optional<String> getClusterName(ImmutableMap<String, String> conf) { | ||
| return getDriverHost(conf) | ||
| .map(host -> host.split("\\.")[0]) | ||
| .map(s -> s.substring(0, s.lastIndexOf("-"))); | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocRegion(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(DATAPROC_REGION_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocJobID(ImmutableMap<String, String> conf) { | ||
| return getPropertyFromYarnTag(conf, JOB_ID_PREFIX); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocJobUUID(ImmutableMap<String, String> conf) { | ||
| return getPropertyFromYarnTag(conf, JOB_UUID_PREFIX); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocBatchID(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(BATCH_ID_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocBatchUUID(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(BATCH_UUID_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getDataprocSessionID(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(SESSION_ID_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| private static Optional<String> getDataprocSessionUUID(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(SESSION_UUID_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getGCPProjectId(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(PROJECT_ID_ENDPOINT, conf) | ||
| .map(b -> b.substring(b.lastIndexOf('/') + 1)); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getSparkAppId(ImmutableMap<String, String> conf) { | ||
| return Optional.ofNullable(conf.get(SPARK_APP_ID)); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getSparkAppName(ImmutableMap<String, String> conf) { | ||
| return Optional.ofNullable(conf.get(SPARK_APP_NAME)); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getClusterUUID(ImmutableMap<String, String> conf) { | ||
| return fetchGCPMetadata(CLUSTER_UUID_ENDPOINT, conf); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> getPropertyFromYarnTag( | ||
| ImmutableMap<String, String> conf, String tagPrefix) { | ||
| String yarnTag = conf.get(SPARK_YARN_TAGS); | ||
| if (yarnTag == null) { | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return Optional.empty(); | ||
| } | ||
| return Arrays.stream(yarnTag.split(",")) | ||
| .filter(tag -> tag.contains(tagPrefix)) | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .findFirst() | ||
| .map(tag -> tag.substring(tagPrefix.length())); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| static Optional<String> fetchGCPMetadata(String httpEndpoint, ImmutableMap<String, String> conf) { | ||
tnazarew marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| String baseUri = conf.getOrDefault(GOOGLE_METADATA_API, BASE_URI); | ||
| String httpURI = baseUri + httpEndpoint; | ||
| HttpGet httpGet = new HttpGet(httpURI); | ||
| httpGet.addHeader(METADATA_FLAVOUR, GOOGLE); | ||
| try { | ||
| return HTTP_CLIENT.execute( | ||
| httpGet, | ||
| response -> { | ||
| handleError(response); | ||
| return Optional.of(EntityUtils.toString(response.getEntity())); | ||
| }); | ||
| } catch (IOException e) { | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
|
|
||
| private static void handleError(ClassicHttpResponse response) throws IOException, ParseException { | ||
| final int statusCode = response.getCode(); | ||
| if (statusCode < 400 || statusCode >= 600) return; | ||
| String message = | ||
| String.format( | ||
| "code: %d, response: %s", | ||
| statusCode, EntityUtils.toString(response.getEntity(), UTF_8)); | ||
| throw new IOException(message); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.