Skip to content

Commit ad2f25d

Browse files
authored
Export logs to Cloud logging by enabling a flag (GoogleCloudDataproc#1353)
* Add new flag 'fs.gs.cloud.logging.enable' to export logs to cloud logging. * Add LoggingInterceptor to intercept log records sent to flogger and write those log records to Cloud logging. Also added unit test * Add LoggingInterceptor to intercept log records sent to flogger and write those log records to Cloud logging. Also added unit test * Configure logging configuration and add it to root logger handler. * Configure logging configuration and add it to root logger handler. * Merge remote-tracking branch 'origin/exp' into exp * Use credentials to create logging client. Structure log entry to include class and method name * Use credentials to create logging client. Structure log entry to include class and method name * Fix unit tests. * Add java doc, sort configuration and resolve other review comments. * Add java doc, sort configuration and resolve other review comments.
1 parent fdb0611 commit ad2f25d

File tree

6 files changed

+258
-0
lines changed

6 files changed

+258
-0
lines changed

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystem.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.BLOCK_SIZE;
2020
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.DELEGATION_TOKEN_BINDING_CLASS;
21+
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_APPLICATION_NAME_SUFFIX;
22+
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_CLOUD_LOGGING_ENABLE;
2123
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_CONFIG_PREFIX;
2224
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_FILE_CHECKSUM_TYPE;
2325
import static com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemConfiguration.GCS_GLOB_ALGORITHM;
@@ -58,6 +60,7 @@
5860
import com.google.cloud.hadoop.util.ITraceFactory;
5961
import com.google.cloud.hadoop.util.PropertyUtil;
6062
import com.google.cloud.hadoop.util.TraceFactory;
63+
import com.google.cloud.hadoop.util.interceptors.LoggingInterceptor;
6164
import com.google.common.annotations.VisibleForTesting;
6265
import com.google.common.base.Ascii;
6366
import com.google.common.base.Suppliers;
@@ -93,6 +96,7 @@
9396
import java.util.concurrent.ThreadFactory;
9497
import java.util.concurrent.TimeUnit;
9598
import java.util.function.Supplier;
99+
import java.util.logging.Logger;
96100
import java.util.stream.Collectors;
97101
import javax.annotation.Nonnull;
98102
import org.apache.hadoop.conf.Configuration;
@@ -294,6 +298,10 @@ public void initialize(URI path, Configuration config) throws IOException {
294298
// be sufficient (and is required) for the delegation token binding initialization.
295299
setConf(config);
296300

301+
if (GCS_CLOUD_LOGGING_ENABLE.get(getConf(), getConf()::getBoolean)) {
302+
initializeCloudLogger(config);
303+
}
304+
297305
globAlgorithm = GCS_GLOB_ALGORITHM.get(config, config::getEnum);
298306
checksumType = GCS_FILE_CHECKSUM_TYPE.get(config, config::getEnum);
299307
defaultBlockSize = BLOCK_SIZE.get(config, config::getLong);
@@ -417,6 +425,15 @@ private void initializeGcsFs(GoogleCloudStorageFileSystem gcsFs) {
417425
gcsFsInitialized = true;
418426
}
419427

428+
private void initializeCloudLogger(Configuration config) throws IOException {
429+
GoogleCredentials credentials = getCredentials(config);
430+
String suffix = GCS_APPLICATION_NAME_SUFFIX.get(getConf(), getConf()::get);
431+
LoggingInterceptor loggingInterceptor = new LoggingInterceptor(credentials, suffix);
432+
// Add the LoggingInterceptor to the root logger
433+
Logger rootLogger = Logger.getLogger("");
434+
rootLogger.addHandler(loggingInterceptor);
435+
}
436+
420437
private GoogleCloudStorageFileSystem createGcsFs(Configuration config) throws IOException {
421438
GoogleCloudStorageFileSystemOptions gcsFsOptions =
422439
GoogleHadoopFileSystemConfiguration.getGcsFsOptionsBuilder(config).build();

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,10 @@ public class GoogleHadoopFileSystemConfiguration {
487487
new HadoopConfigurationProperty<>(
488488
"fs.gs.operation.tracelog.enable", GoogleCloudStorageOptions.DEFAULT.isTraceLogEnabled());
489489

490+
/** Configuration key to export logs to Google cloud logging. */
491+
public static final HadoopConfigurationProperty<Boolean> GCS_CLOUD_LOGGING_ENABLE =
492+
new HadoopConfigurationProperty<>("fs.gs.cloud.logging.enable", false);
493+
490494
/** Configuration key to configure client to use for GCS access. */
491495
public static final HadoopConfigurationProperty<ClientType> GCS_CLIENT_TYPE =
492496
new HadoopConfigurationProperty<>(

gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class GoogleHadoopFileSystemConfigurationTest {
6060
put("fs.gs.bucket.delete.enable", false);
6161
put("fs.gs.checksum.type", GcsFileChecksumType.NONE);
6262
put("fs.gs.client.type", ClientType.HTTP_API_CLIENT);
63+
put("fs.gs.cloud.logging.enable", false);
6364
put("fs.gs.copy.with.rewrite.enable", true);
6465
put("fs.gs.create.items.conflict.check.enable", true);
6566
put("fs.gs.delegation.token.binding", null);

util/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@
6969
<groupId>com.google.flogger</groupId>
7070
<artifactId>flogger-system-backend</artifactId>
7171
</dependency>
72+
<dependency>
73+
<groupId>com.google.cloud</groupId>
74+
<artifactId>google-cloud-logging</artifactId>
75+
<version>3.21.3</version>
76+
</dependency>
7277

7378
<!-- Test dependencies -->
7479
<dependency>
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package com.google.cloud.hadoop.util.interceptors;
2+
3+
import com.google.auth.oauth2.GoogleCredentials;
4+
import com.google.cloud.logging.LogEntry;
5+
import com.google.cloud.logging.Logging;
6+
import com.google.cloud.logging.LoggingOptions;
7+
import com.google.cloud.logging.Payload.StringPayload;
8+
import com.google.cloud.logging.Severity;
9+
import java.util.Collections;
10+
import java.util.logging.Handler;
11+
import java.util.logging.Level;
12+
import java.util.logging.LogRecord;
13+
14+
/**
15+
* A logging interceptor that publishes log records to Google Cloud Logging. This class extends
16+
* {@link Handler} to integrate with the Java logging framework.
17+
*/
18+
public class LoggingInterceptor extends Handler {
19+
20+
private final Logging cloudLogging;
21+
private final String logNameSuffix;
22+
private static final String LOG_NAME_PREFIX = "gcs-connector";
23+
24+
/**
25+
* Constructs a new {@code LoggingInterceptor}.
26+
*
27+
* @param credentials the Google Cloud credentials used to authenticate with the Logging service
28+
* @param logNameSuffix the suffix to append to the log name
29+
*/
30+
public LoggingInterceptor(GoogleCredentials credentials, String logNameSuffix) {
31+
this.cloudLogging = createLoggingService(credentials);
32+
this.logNameSuffix = logNameSuffix;
33+
}
34+
35+
/**
36+
* Creates a Google Cloud Logging service instance.
37+
*
38+
* @param credentials the Google Cloud credentials used to authenticate with the Logging service
39+
* @return a {@link Logging} instance
40+
*/
41+
protected Logging createLoggingService(GoogleCredentials credentials) {
42+
return LoggingOptions.newBuilder().setCredentials(credentials).build().getService();
43+
}
44+
45+
/**
46+
* Publishes a log record to Google Cloud Logging.
47+
*
48+
* @param record the log record to publish
49+
*/
50+
@Override
51+
public void publish(LogRecord record) {
52+
if (!isLoggable(record)) {
53+
return;
54+
}
55+
String logName = String.join("-", LOG_NAME_PREFIX, logNameSuffix).replaceAll("-$", "");
56+
57+
LogEntry entry =
58+
LogEntry.newBuilder(StringPayload.of(record.getMessage()))
59+
.setSeverity(mapToCloudSeverity(record.getLevel()))
60+
.setLogName(logName)
61+
.addLabel("class", record.getSourceClassName())
62+
.addLabel("method", record.getSourceMethodName())
63+
.build();
64+
65+
cloudLogging.write(Collections.singleton(entry));
66+
}
67+
68+
/** Flushes any buffered log entries to Google Cloud Logging. */
69+
@Override
70+
public void flush() {
71+
cloudLogging.flush();
72+
}
73+
74+
/**
75+
* Closes the Google Cloud Logging service.
76+
*
77+
* @throws SecurityException if an error occurs while closing the service
78+
*/
79+
@Override
80+
public void close() throws SecurityException {
81+
try {
82+
cloudLogging.close();
83+
} catch (Exception e) {
84+
throw new RuntimeException("Failed to close the Google Cloud Logging service", e);
85+
}
86+
}
87+
88+
/**
89+
* Maps a {@link Level} to a corresponding Google Cloud Logging {@link Severity}.
90+
*
91+
* @param level the Java logging level
92+
* @return the corresponding Google Cloud Logging severity
93+
*/
94+
private Severity mapToCloudSeverity(Level level) {
95+
switch (level.getName()) {
96+
case "SEVERE":
97+
return Severity.ERROR;
98+
case "WARNING":
99+
return Severity.WARNING;
100+
case "INFO":
101+
return Severity.INFO;
102+
case "FINE":
103+
case "FINER":
104+
case "FINEST":
105+
return Severity.DEBUG;
106+
default:
107+
return Severity.DEFAULT;
108+
}
109+
}
110+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package com.google.cloud.hadoop.util;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import com.google.auth.oauth2.GoogleCredentials;
6+
import com.google.cloud.hadoop.util.interceptors.LoggingInterceptor;
7+
import com.google.cloud.logging.LogEntry;
8+
import com.google.cloud.logging.Logging;
9+
import com.google.cloud.logging.Payload.StringPayload;
10+
import com.google.cloud.logging.Severity;
11+
import java.util.Collections;
12+
import java.util.logging.Level;
13+
import java.util.logging.LogRecord;
14+
import org.junit.Before;
15+
import org.junit.Test;
16+
17+
public class LoggingInterceptorTest {
18+
19+
private Logging mockLogging;
20+
private LoggingInterceptor loggingInterceptor;
21+
22+
@Before
23+
public void setUp() {
24+
mockLogging = mock(Logging.class);
25+
loggingInterceptor =
26+
new LoggingInterceptor(GoogleCredentials.newBuilder().build(), "") {
27+
@Override
28+
protected Logging createLoggingService(GoogleCredentials credentials) {
29+
return mockLogging;
30+
}
31+
};
32+
}
33+
34+
@Test
35+
public void publishesLogEntryWithCorrectSeverity() {
36+
LogRecord record = new LogRecord(Level.SEVERE, "Critical error occurred");
37+
record.setSourceClassName("com.example.MyClass");
38+
record.setSourceMethodName("myMethod");
39+
loggingInterceptor.publish(record);
40+
41+
LogEntry expectedEntry =
42+
LogEntry.newBuilder(StringPayload.of("Critical error occurred"))
43+
.setSeverity(Severity.ERROR)
44+
.setLogName("gcs-connector")
45+
.addLabel("class", "com.example.MyClass")
46+
.addLabel("method", "myMethod")
47+
.build();
48+
49+
verify(mockLogging).write(Collections.singleton(expectedEntry));
50+
}
51+
52+
@Test
53+
public void publishesLogEntryWithSuffixedLogName() {
54+
LoggingInterceptor customloggingInterceptor =
55+
new LoggingInterceptor(GoogleCredentials.newBuilder().build(), "suffix") {
56+
@Override
57+
protected Logging createLoggingService(GoogleCredentials credentials) {
58+
return mockLogging;
59+
}
60+
};
61+
LogRecord record = new LogRecord(Level.INFO, "Information message");
62+
record.setSourceClassName("com.example.MyClass");
63+
record.setSourceMethodName("myMethod");
64+
customloggingInterceptor.publish(record);
65+
66+
LogEntry expectedEntry =
67+
LogEntry.newBuilder(StringPayload.of("Information message"))
68+
.setSeverity(Severity.INFO)
69+
.setLogName("gcs-connector-suffix")
70+
.addLabel("class", "com.example.MyClass")
71+
.addLabel("method", "myMethod")
72+
.build();
73+
74+
verify(mockLogging).write(Collections.singleton(expectedEntry));
75+
}
76+
77+
@Test
78+
public void doesNotPublishNonLoggableRecord() {
79+
LoggingInterceptor nonLoggableInterceptor =
80+
new LoggingInterceptor(GoogleCredentials.newBuilder().build(), "") {
81+
@Override
82+
public boolean isLoggable(LogRecord record) {
83+
return false; // Force isLoggable() to return false
84+
}
85+
86+
@Override
87+
protected Logging createLoggingService(GoogleCredentials credentials) {
88+
return mockLogging;
89+
}
90+
};
91+
92+
LogRecord record = new LogRecord(Level.FINE, "Debug message");
93+
nonLoggableInterceptor.publish(record);
94+
95+
verify(mockLogging, never()).write(any());
96+
}
97+
98+
@Test
99+
public void flushesLoggingService() {
100+
loggingInterceptor.flush();
101+
verify(mockLogging).flush();
102+
}
103+
104+
@Test
105+
public void mapsUnknownLogLevelToDefaultSeverity() {
106+
LogRecord record = new LogRecord(Level.CONFIG, "Configuration message");
107+
record.setSourceClassName("com.example.MyClass");
108+
record.setSourceMethodName("myMethod");
109+
loggingInterceptor.publish(record);
110+
111+
LogEntry expectedEntry =
112+
LogEntry.newBuilder(StringPayload.of("Configuration message"))
113+
.setSeverity(Severity.DEFAULT)
114+
.setLogName("gcs-connector")
115+
.addLabel("class", "com.example.MyClass")
116+
.addLabel("method", "myMethod")
117+
.build();
118+
119+
verify(mockLogging).write(Collections.singleton(expectedEntry));
120+
}
121+
}

0 commit comments

Comments
 (0)