Skip to content

Commit 305ed1f

Browse files
committed
Add more logging for cloud run debugging
1 parent 4ce3667 commit 305ed1f

File tree

2 files changed

+159
-79
lines changed

2 files changed

+159
-79
lines changed

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/CatalogIntegrationTestBase.java

Lines changed: 121 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,9 @@
1515
*/
1616
package com.google.cloud.spark.bigquery.integration;
1717

18-
import static com.google.common.truth.Truth.assertThat;
19-
20-
import com.google.cloud.bigquery.BigQuery;
21-
import com.google.cloud.bigquery.Dataset;
22-
import com.google.cloud.bigquery.DatasetId;
23-
import com.google.cloud.bigquery.QueryJobConfiguration;
24-
import com.google.cloud.bigquery.Table;
25-
import com.google.cloud.bigquery.TableId;
2618
import java.util.List;
2719
import java.util.stream.Collectors;
20+
2821
import org.apache.spark.sql.Row;
2922
import org.apache.spark.sql.RowFactory;
3023
import org.apache.spark.sql.SparkSession;
@@ -36,6 +29,14 @@
3629
import org.junit.Ignore;
3730
import org.junit.Test;
3831

32+
import com.google.cloud.bigquery.BigQuery;
33+
import com.google.cloud.bigquery.Dataset;
34+
import com.google.cloud.bigquery.DatasetId;
35+
import com.google.cloud.bigquery.QueryJobConfiguration;
36+
import com.google.cloud.bigquery.Table;
37+
import com.google.cloud.bigquery.TableId;
38+
import static com.google.common.truth.Truth.assertThat;
39+
3940
public class CatalogIntegrationTestBase {
4041

4142
public static final String DEFAULT_NAMESPACE = "default";
@@ -45,7 +46,7 @@ public class CatalogIntegrationTestBase {
4546

4647
protected static SparkSession spark;
4748
private String testTable;
48-
// 2. Initialize the SparkSession ONCE before all tests
49+
4950
@BeforeClass
5051
public static void setupSparkSession() {
5152
spark =
@@ -60,8 +61,6 @@ public static void setupSparkSession() {
6061
.getOrCreate();
6162
}
6263

63-
// 4. Stop the SparkSession ONCE after all tests are done
64-
// This fixes the local IllegalStateException (race condition)
6564
@AfterClass
6665
public static void teardownSparkSession() {
6766
if (spark != null) {
@@ -253,66 +252,130 @@ public void testDropDatabase() {
253252

254253
@Test
255254
public void testCatalogInitializationWithProject() {
256-
spark
257-
.conf()
258-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
259-
spark.conf().set("spark.sql.catalog.public_catalog.project", "bigquery-public-data");
260-
261-
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
262-
List<String> databaseNames =
263-
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
264-
assertThat(databaseNames).contains("samples");
265-
266-
List<Row> data =
267-
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
268-
assertThat(data).hasSize(10);
255+
try {
256+
spark
257+
.conf()
258+
.set(
259+
"spark.sql.catalog.public_catalog",
260+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
261+
// Use 'projectId' instead of 'project' - this is the correct property name
262+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
263+
264+
// Add a small delay to ensure catalog is fully initialized
265+
Thread.sleep(2000);
266+
267+
// Verify catalog is accessible before querying
268+
try {
269+
spark.sql("USE public_catalog");
270+
} catch (Exception e) {
271+
// Catalog might not support USE, that's okay
272+
}
273+
274+
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
275+
List<String> databaseNames =
276+
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
277+
assertThat(databaseNames).contains("samples");
278+
279+
List<Row> data =
280+
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
281+
assertThat(data).hasSize(10);
282+
} catch (Exception e) {
283+
// Log the full stack trace to help debug cloud build failures
284+
e.printStackTrace();
285+
throw new RuntimeException("Test failed with detailed error", e);
286+
} finally {
287+
// Clean up catalog configuration to avoid interference with other tests
288+
try {
289+
spark.conf().unset("spark.sql.catalog.public_catalog");
290+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
291+
} catch (Exception ignored) {
292+
}
293+
}
269294
}
270295

271296
@Test
272297
public void testCreateCatalogWithLocation() throws Exception {
273298
String database = String.format("create_db_with_location_%s", System.nanoTime());
274299
DatasetId datasetId = DatasetId.of(database);
275-
spark
276-
.conf()
277-
.set(
278-
"spark.sql.catalog.test_location_catalog",
279-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
280-
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
281-
spark.sql("CREATE DATABASE test_location_catalog." + database);
282-
Dataset dataset = bigquery.getDataset(datasetId);
283-
assertThat(dataset).isNotNull();
284-
assertThat(dataset.getLocation()).isEqualTo("EU");
285-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
300+
try {
301+
spark
302+
.conf()
303+
.set(
304+
"spark.sql.catalog.test_location_catalog",
305+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
306+
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
307+
308+
// Add delay for catalog initialization
309+
Thread.sleep(2000);
310+
311+
spark.sql("CREATE DATABASE test_location_catalog." + database);
312+
Dataset dataset = bigquery.getDataset(datasetId);
313+
assertThat(dataset).isNotNull();
314+
assertThat(dataset.getLocation()).isEqualTo("EU");
315+
} finally {
316+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
317+
// Clean up catalog configuration
318+
try {
319+
spark.conf().unset("spark.sql.catalog.test_location_catalog");
320+
spark.conf().unset("spark.sql.catalog.test_location_catalog.bigquery_location");
321+
} catch (Exception ignored) {
322+
}
323+
}
286324
}
287325

288326
@Test
289327
public void testCreateTableAsSelectWithProjectAndLocation() {
290328
String database = String.format("ctas_db_with_location_%s", System.nanoTime());
291329
String newTable = "ctas_table_from_public";
292330
DatasetId datasetId = DatasetId.of(database);
293-
spark
294-
.conf()
295-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
296-
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
297-
spark
298-
.conf()
299-
.set(
300-
"spark.sql.catalog.test_catalog_as_select",
301-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
302-
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
303-
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
304-
spark.sql(
305-
"CREATE TABLE test_catalog_as_select."
306-
+ database
307-
+ "."
308-
+ newTable
309-
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
310-
Dataset dataset = bigquery.getDataset(datasetId);
311-
assertThat(dataset).isNotNull();
312-
assertThat(dataset.getLocation()).isEqualTo("EU");
313-
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
314-
assertThat(table).isNotNull();
315-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
331+
try {
332+
spark
333+
.conf()
334+
.set(
335+
"spark.sql.catalog.public_catalog",
336+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
337+
// Use 'projectId' instead of 'project'
338+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
339+
spark
340+
.conf()
341+
.set(
342+
"spark.sql.catalog.test_catalog_as_select",
343+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
344+
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
345+
346+
// Add delay for catalog initialization
347+
Thread.sleep(2000);
348+
349+
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
350+
351+
// Add another small delay after database creation
352+
Thread.sleep(1000);
353+
354+
spark.sql(
355+
"CREATE TABLE test_catalog_as_select."
356+
+ database
357+
+ "."
358+
+ newTable
359+
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
360+
Dataset dataset = bigquery.getDataset(datasetId);
361+
assertThat(dataset).isNotNull();
362+
assertThat(dataset.getLocation()).isEqualTo("EU");
363+
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
364+
assertThat(table).isNotNull();
365+
} catch (Exception e) {
366+
e.printStackTrace();
367+
throw new RuntimeException("Test failed with detailed error", e);
368+
} finally {
369+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
370+
// Clean up catalog configurations
371+
try {
372+
spark.conf().unset("spark.sql.catalog.public_catalog");
373+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
374+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select");
375+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select.bigquery_location");
376+
} catch (Exception ignored) {
377+
}
378+
}
316379
}
317380

318381
private static SparkSession createSparkSession() {

spark-bigquery-dsv2/spark-3.5-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/BigQueryCatalog.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,32 @@ public class BigQueryCatalog implements TableCatalog, SupportsNamespaces {
8181

8282
@Override
8383
public void initialize(String name, CaseInsensitiveStringMap caseInsensitiveStringMap) {
84-
logger.info("Initializing BigQuery table catalog [{}])", name);
85-
Injector injector =
86-
new InjectorBuilder()
87-
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
88-
.withTableIsMandatory(false)
89-
.build();
90-
tableProvider =
91-
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
92-
.filter(candidate -> candidate.shortName().equals("bigquery"))
93-
.map(candidate -> (TableProvider) candidate)
94-
.findFirst()
95-
.orElseThrow(
96-
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
97-
bigQueryClient = injector.getInstance(BigQueryClient.class);
98-
schemaConverters =
99-
SchemaConverters.from(
100-
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
84+
logger.info(
85+
"Initializing BigQuery table catalog [{}] with options: {}",
86+
name,
87+
caseInsensitiveStringMap);
88+
try {
89+
Injector injector =
90+
new InjectorBuilder()
91+
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
92+
.withTableIsMandatory(false)
93+
.build();
94+
tableProvider =
95+
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
96+
.filter(candidate -> candidate.shortName().equals("bigquery"))
97+
.map(candidate -> (TableProvider) candidate)
98+
.findFirst()
99+
.orElseThrow(
100+
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
101+
bigQueryClient = injector.getInstance(BigQueryClient.class);
102+
schemaConverters =
103+
SchemaConverters.from(
104+
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
105+
logger.info("BigQuery table catalog [{}] initialized successfully", name);
106+
} catch (Exception e) {
107+
logger.error("Failed to initialize BigQuery catalog [{}]", name, e);
108+
throw new BigQueryConnectorException("Failed to initialize BigQuery catalog: " + name, e);
109+
}
101110
}
102111

103112
@Override
@@ -289,10 +298,18 @@ static TableId toTableId(Identifier identifier) {
289298

290299
@Override
291300
public String[][] listNamespaces() throws NoSuchNamespaceException {
292-
return Streams.stream(bigQueryClient.listDatasets())
293-
.map(Dataset::getDatasetId)
294-
.map(this::toNamespace)
295-
.toArray(String[][]::new);
301+
if (bigQueryClient == null) {
302+
throw new IllegalStateException("BigQuery catalog not properly initialized");
303+
}
304+
try {
305+
return Streams.stream(bigQueryClient.listDatasets())
306+
.map(Dataset::getDatasetId)
307+
.map(this::toNamespace)
308+
.toArray(String[][]::new);
309+
} catch (Exception e) {
310+
logger.error("Error listing namespaces", e);
311+
throw new BigQueryConnectorException("Failed to list namespaces", e);
312+
}
296313
}
297314

298315
private String[] toNamespace(DatasetId datasetId) {

0 commit comments

Comments
 (0)