Skip to content

Commit 2a3b78a

Browse files
committed
Add more logging for cloud run debugging
1 parent 4ce3667 commit 2a3b78a

File tree

2 files changed

+153
-72
lines changed

2 files changed

+153
-72
lines changed

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/CatalogIntegrationTestBase.java

Lines changed: 112 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class CatalogIntegrationTestBase {
4545

4646
protected static SparkSession spark;
4747
private String testTable;
48-
// 2. Initialize the SparkSession ONCE before all tests
48+
4949
@BeforeClass
5050
public static void setupSparkSession() {
5151
spark =
@@ -60,8 +60,6 @@ public static void setupSparkSession() {
6060
.getOrCreate();
6161
}
6262

63-
// 4. Stop the SparkSession ONCE after all tests are done
64-
// This fixes the local IllegalStateException (race condition)
6563
@AfterClass
6664
public static void teardownSparkSession() {
6765
if (spark != null) {
@@ -253,66 +251,130 @@ public void testDropDatabase() {
253251

254252
@Test
255253
public void testCatalogInitializationWithProject() {
256-
spark
257-
.conf()
258-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
259-
spark.conf().set("spark.sql.catalog.public_catalog.project", "bigquery-public-data");
260-
261-
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
262-
List<String> databaseNames =
263-
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
264-
assertThat(databaseNames).contains("samples");
265-
266-
List<Row> data =
267-
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
268-
assertThat(data).hasSize(10);
254+
try {
255+
spark
256+
.conf()
257+
.set(
258+
"spark.sql.catalog.public_catalog",
259+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
260+
// Use 'projectId' instead of 'project' - this is the correct property name
261+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
262+
263+
// Add a small delay to ensure catalog is fully initialized
264+
Thread.sleep(2000);
265+
266+
// Verify catalog is accessible before querying
267+
try {
268+
spark.sql("USE public_catalog");
269+
} catch (Exception e) {
270+
// Catalog might not support USE, that's okay
271+
}
272+
273+
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
274+
List<String> databaseNames =
275+
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
276+
assertThat(databaseNames).contains("samples");
277+
278+
List<Row> data =
279+
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
280+
assertThat(data).hasSize(10);
281+
} catch (Exception e) {
282+
// Log the full stack trace to help debug cloud build failures
283+
e.printStackTrace();
284+
throw new RuntimeException("Test failed with detailed error", e);
285+
} finally {
286+
// Clean up catalog configuration to avoid interference with other tests
287+
try {
288+
spark.conf().unset("spark.sql.catalog.public_catalog");
289+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
290+
} catch (Exception ignored) {
291+
}
292+
}
269293
}
270294

271295
@Test
272296
public void testCreateCatalogWithLocation() throws Exception {
273297
String database = String.format("create_db_with_location_%s", System.nanoTime());
274298
DatasetId datasetId = DatasetId.of(database);
275-
spark
276-
.conf()
277-
.set(
278-
"spark.sql.catalog.test_location_catalog",
279-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
280-
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
281-
spark.sql("CREATE DATABASE test_location_catalog." + database);
282-
Dataset dataset = bigquery.getDataset(datasetId);
283-
assertThat(dataset).isNotNull();
284-
assertThat(dataset.getLocation()).isEqualTo("EU");
285-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
299+
try {
300+
spark
301+
.conf()
302+
.set(
303+
"spark.sql.catalog.test_location_catalog",
304+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
305+
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
306+
307+
// Add delay for catalog initialization
308+
Thread.sleep(2000);
309+
310+
spark.sql("CREATE DATABASE test_location_catalog." + database);
311+
Dataset dataset = bigquery.getDataset(datasetId);
312+
assertThat(dataset).isNotNull();
313+
assertThat(dataset.getLocation()).isEqualTo("EU");
314+
} finally {
315+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
316+
// Clean up catalog configuration
317+
try {
318+
spark.conf().unset("spark.sql.catalog.test_location_catalog");
319+
spark.conf().unset("spark.sql.catalog.test_location_catalog.bigquery_location");
320+
} catch (Exception ignored) {
321+
}
322+
}
286323
}
287324

288325
@Test
289326
public void testCreateTableAsSelectWithProjectAndLocation() {
290327
String database = String.format("ctas_db_with_location_%s", System.nanoTime());
291328
String newTable = "ctas_table_from_public";
292329
DatasetId datasetId = DatasetId.of(database);
293-
spark
294-
.conf()
295-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
296-
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
297-
spark
298-
.conf()
299-
.set(
300-
"spark.sql.catalog.test_catalog_as_select",
301-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
302-
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
303-
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
304-
spark.sql(
305-
"CREATE TABLE test_catalog_as_select."
306-
+ database
307-
+ "."
308-
+ newTable
309-
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
310-
Dataset dataset = bigquery.getDataset(datasetId);
311-
assertThat(dataset).isNotNull();
312-
assertThat(dataset.getLocation()).isEqualTo("EU");
313-
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
314-
assertThat(table).isNotNull();
315-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
330+
try {
331+
spark
332+
.conf()
333+
.set(
334+
"spark.sql.catalog.public_catalog",
335+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
336+
// Use 'projectId' instead of 'project'
337+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
338+
spark
339+
.conf()
340+
.set(
341+
"spark.sql.catalog.test_catalog_as_select",
342+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
343+
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
344+
345+
// Add delay for catalog initialization
346+
Thread.sleep(2000);
347+
348+
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
349+
350+
// Add another small delay after database creation
351+
Thread.sleep(1000);
352+
353+
spark.sql(
354+
"CREATE TABLE test_catalog_as_select."
355+
+ database
356+
+ "."
357+
+ newTable
358+
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
359+
Dataset dataset = bigquery.getDataset(datasetId);
360+
assertThat(dataset).isNotNull();
361+
assertThat(dataset.getLocation()).isEqualTo("EU");
362+
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
363+
assertThat(table).isNotNull();
364+
} catch (Exception e) {
365+
e.printStackTrace();
366+
throw new RuntimeException("Test failed with detailed error", e);
367+
} finally {
368+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
369+
// Clean up catalog configurations
370+
try {
371+
spark.conf().unset("spark.sql.catalog.public_catalog");
372+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
373+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select");
374+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select.bigquery_location");
375+
} catch (Exception ignored) {
376+
}
377+
}
316378
}
317379

318380
private static SparkSession createSparkSession() {

spark-bigquery-dsv2/spark-3.5-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/BigQueryCatalog.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -81,23 +81,33 @@ public class BigQueryCatalog implements TableCatalog, SupportsNamespaces {
8181

8282
@Override
8383
public void initialize(String name, CaseInsensitiveStringMap caseInsensitiveStringMap) {
84-
logger.info("Initializing BigQuery table catalog [{}])", name);
85-
Injector injector =
86-
new InjectorBuilder()
87-
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
88-
.withTableIsMandatory(false)
89-
.build();
90-
tableProvider =
91-
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
92-
.filter(candidate -> candidate.shortName().equals("bigquery"))
93-
.map(candidate -> (TableProvider) candidate)
94-
.findFirst()
95-
.orElseThrow(
96-
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
97-
bigQueryClient = injector.getInstance(BigQueryClient.class);
98-
schemaConverters =
99-
SchemaConverters.from(
100-
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
84+
logger.info(
85+
"Initializing BigQuery table catalog [{}] with options: {}",
86+
name,
87+
caseInsensitiveStringMap);
88+
89+
try {
90+
Injector injector =
91+
new InjectorBuilder()
92+
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
93+
.withTableIsMandatory(false)
94+
.build();
95+
tableProvider =
96+
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
97+
.filter(candidate -> candidate.shortName().equals("bigquery"))
98+
.map(candidate -> (TableProvider) candidate)
99+
.findFirst()
100+
.orElseThrow(
101+
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
102+
bigQueryClient = injector.getInstance(BigQueryClient.class);
103+
schemaConverters =
104+
SchemaConverters.from(
105+
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
106+
logger.info("BigQuery table catalog [{}] initialized successfully", name);
107+
} catch (Exception e) {
108+
logger.error("Failed to initialize BigQuery catalog [{}]", name, e);
109+
throw new BigQueryConnectorException("Failed to initialize BigQuery catalog: " + name, e);
110+
}
101111
}
102112

103113
@Override
@@ -161,7 +171,8 @@ Map<String, String> toLoadProperties(Identifier identifier) {
161171
result.put("dataset", identifier.namespace()[0]);
162172
break;
163173
case 2:
164-
result.put("project", identifier.namespace()[0]);
174+
// Use 'projectId' instead of 'project' to match the connector's configuration
175+
result.put("projectId", identifier.namespace()[0]);
165176
result.put("dataset", identifier.namespace()[1]);
166177
break;
167178
default:
@@ -289,10 +300,18 @@ static TableId toTableId(Identifier identifier) {
289300

290301
@Override
291302
public String[][] listNamespaces() throws NoSuchNamespaceException {
292-
return Streams.stream(bigQueryClient.listDatasets())
293-
.map(Dataset::getDatasetId)
294-
.map(this::toNamespace)
295-
.toArray(String[][]::new);
303+
if (bigQueryClient == null) {
304+
throw new IllegalStateException("BigQuery catalog not properly initialized");
305+
}
306+
try {
307+
return Streams.stream(bigQueryClient.listDatasets())
308+
.map(Dataset::getDatasetId)
309+
.map(this::toNamespace)
310+
.toArray(String[][]::new);
311+
} catch (Exception e) {
312+
logger.error("Error listing namespaces", e);
313+
throw new BigQueryConnectorException("Failed to list namespaces", e);
314+
}
296315
}
297316

298317
private String[] toNamespace(DatasetId datasetId) {

0 commit comments

Comments
 (0)