1515 */
1616package com .google .cloud .spark .bigquery .integration ;
1717
18- import static com .google .common .truth .Truth .assertThat ;
19-
20- import com .google .cloud .bigquery .BigQuery ;
21- import com .google .cloud .bigquery .Dataset ;
22- import com .google .cloud .bigquery .DatasetId ;
23- import com .google .cloud .bigquery .QueryJobConfiguration ;
24- import com .google .cloud .bigquery .Table ;
25- import com .google .cloud .bigquery .TableId ;
2618import java .util .List ;
2719import java .util .stream .Collectors ;
20+
2821import org .apache .spark .sql .Row ;
2922import org .apache .spark .sql .RowFactory ;
3023import org .apache .spark .sql .SparkSession ;
3629import org .junit .Ignore ;
3730import org .junit .Test ;
3831
32+ import com .google .cloud .bigquery .BigQuery ;
33+ import com .google .cloud .bigquery .Dataset ;
34+ import com .google .cloud .bigquery .DatasetId ;
35+ import com .google .cloud .bigquery .QueryJobConfiguration ;
36+ import com .google .cloud .bigquery .Table ;
37+ import com .google .cloud .bigquery .TableId ;
38+ import static com .google .common .truth .Truth .assertThat ;
39+
3940public class CatalogIntegrationTestBase {
4041
4142 public static final String DEFAULT_NAMESPACE = "default" ;
@@ -45,7 +46,6 @@ public class CatalogIntegrationTestBase {
4546
4647 protected static SparkSession spark ;
4748 private String testTable ;
48- // 2. Initialize the SparkSession ONCE before all tests
4949 @ BeforeClass
5050 public static void setupSparkSession () {
5151 spark =
@@ -60,8 +60,6 @@ public static void setupSparkSession() {
6060 .getOrCreate ();
6161 }
6262
63- // 4. Stop the SparkSession ONCE after all tests are done
64- // This fixes the local IllegalStateException (race condition)
6563 @ AfterClass
6664 public static void teardownSparkSession () {
6765 if (spark != null ) {
@@ -253,66 +251,113 @@ public void testDropDatabase() {
253251
254252 @ Test
255253 public void testCatalogInitializationWithProject () {
256- spark
257- .conf ()
258- .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
259- spark .conf ().set ("spark.sql.catalog.public_catalog.project" , "bigquery-public-data" );
260-
261- List <Row > rows = spark .sql ("SHOW DATABASES IN public_catalog" ).collectAsList ();
262- List <String > databaseNames =
263- rows .stream ().map (row -> row .getString (0 )).collect (Collectors .toList ());
264- assertThat (databaseNames ).contains ("samples" );
265-
266- List <Row > data =
267- spark .sql ("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" ).collectAsList ();
268- assertThat (data ).hasSize (10 );
254+ try {
255+ spark
256+ .conf ()
257+ .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
258+ spark .conf ().set ("spark.sql.catalog.public_catalog.project" , "bigquery-public-data" );
259+
260+ // Add a small delay to ensure catalog is fully initialized
261+ Thread .sleep (1000 );
262+
263+ List <Row > rows = spark .sql ("SHOW DATABASES IN public_catalog" ).collectAsList ();
264+ List <String > databaseNames =
265+ rows .stream ().map (row -> row .getString (0 )).collect (Collectors .toList ());
266+ assertThat (databaseNames ).contains ("samples" );
267+
268+ List <Row > data =
269+ spark .sql ("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" ).collectAsList ();
270+ assertThat (data ).hasSize (10 );
271+ } catch (Exception e ) {
272+ // Log the full stack trace to help debug cloud build failures
273+ e .printStackTrace ();
274+ throw new RuntimeException ("Test failed with detailed error" , e );
275+ } finally {
276+ // Clean up catalog configuration to avoid interference with other tests
277+ try {
278+ spark .conf ().unset ("spark.sql.catalog.public_catalog" );
279+ spark .conf ().unset ("spark.sql.catalog.public_catalog.project" );
280+ } catch (Exception ignored ) {
281+ }
282+ }
269283 }
270284
271285 @ Test
272286 public void testCreateCatalogWithLocation () throws Exception {
273287 String database = String .format ("create_db_with_location_%s" , System .nanoTime ());
274288 DatasetId datasetId = DatasetId .of (database );
275- spark
276- .conf ()
277- .set (
278- "spark.sql.catalog.test_location_catalog" ,
279- "com.google.cloud.spark.bigquery.BigQueryCatalog" );
280- spark .conf ().set ("spark.sql.catalog.test_location_catalog.bigquery_location" , "EU" );
281- spark .sql ("CREATE DATABASE test_location_catalog." + database );
282- Dataset dataset = bigquery .getDataset (datasetId );
283- assertThat (dataset ).isNotNull ();
284- assertThat (dataset .getLocation ()).isEqualTo ("EU" );
285- bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
289+ try {
290+ spark
291+ .conf ()
292+ .set (
293+ "spark.sql.catalog.test_location_catalog" ,
294+ "com.google.cloud.spark.bigquery.BigQueryCatalog" );
295+ spark .conf ().set ("spark.sql.catalog.test_location_catalog.bigquery_location" , "EU" );
296+
297+ // Add delay for catalog initialization
298+ Thread .sleep (1000 );
299+
300+ spark .sql ("CREATE DATABASE test_location_catalog." + database );
301+ Dataset dataset = bigquery .getDataset (datasetId );
302+ assertThat (dataset ).isNotNull ();
303+ assertThat (dataset .getLocation ()).isEqualTo ("EU" );
304+ } finally {
305+ bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
306+ // Clean up catalog configuration
307+ try {
308+ spark .conf ().unset ("spark.sql.catalog.test_location_catalog" );
309+ spark .conf ().unset ("spark.sql.catalog.test_location_catalog.bigquery_location" );
310+ } catch (Exception ignored ) {
311+ }
312+ }
286313 }
287314
288315 @ Test
289316 public void testCreateTableAsSelectWithProjectAndLocation () {
290317 String database = String .format ("ctas_db_with_location_%s" , System .nanoTime ());
291318 String newTable = "ctas_table_from_public" ;
292319 DatasetId datasetId = DatasetId .of (database );
293- spark
294- .conf ()
295- .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
296- spark .conf ().set ("spark.sql.catalog.public_catalog.projectId" , "bigquery-public-data" );
297- spark
298- .conf ()
299- .set (
300- "spark.sql.catalog.test_catalog_as_select" ,
301- "com.google.cloud.spark.bigquery.BigQueryCatalog" );
302- spark .conf ().set ("spark.sql.catalog.test_catalog_as_select.bigquery_location" , "EU" );
303- spark .sql ("CREATE DATABASE test_catalog_as_select." + database );
304- spark .sql (
305- "CREATE TABLE test_catalog_as_select."
306- + database
307- + "."
308- + newTable
309- + " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" );
310- Dataset dataset = bigquery .getDataset (datasetId );
311- assertThat (dataset ).isNotNull ();
312- assertThat (dataset .getLocation ()).isEqualTo ("EU" );
313- Table table = bigquery .getTable (TableId .of (datasetId .getDataset (), newTable ));
314- assertThat (table ).isNotNull ();
315- bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
320+ try {
321+ spark
322+ .conf ()
323+ .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
324+ spark .conf ().set ("spark.sql.catalog.public_catalog.projectId" , "bigquery-public-data" );
325+ spark
326+ .conf ()
327+ .set (
328+ "spark.sql.catalog.test_catalog_as_select" ,
329+ "com.google.cloud.spark.bigquery.BigQueryCatalog" );
330+ spark .conf ().set ("spark.sql.catalog.test_catalog_as_select.bigquery_location" , "EU" );
331+
332+ // Add delay for catalog initialization
333+ Thread .sleep (1000 );
334+
335+ spark .sql ("CREATE DATABASE test_catalog_as_select." + database );
336+ spark .sql (
337+ "CREATE TABLE test_catalog_as_select."
338+ + database
339+ + "."
340+ + newTable
341+ + " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" );
342+ Dataset dataset = bigquery .getDataset (datasetId );
343+ assertThat (dataset ).isNotNull ();
344+ assertThat (dataset .getLocation ()).isEqualTo ("EU" );
345+ Table table = bigquery .getTable (TableId .of (datasetId .getDataset (), newTable ));
346+ assertThat (table ).isNotNull ();
347+ } catch (Exception e ) {
348+ e .printStackTrace ();
349+ throw new RuntimeException ("Test failed with detailed error" , e );
350+ } finally {
351+ bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
352+ // Clean up catalog configurations
353+ try {
354+ spark .conf ().unset ("spark.sql.catalog.public_catalog" );
355+ spark .conf ().unset ("spark.sql.catalog.public_catalog.projectId" );
356+ spark .conf ().unset ("spark.sql.catalog.test_catalog_as_select" );
357+ spark .conf ().unset ("spark.sql.catalog.test_catalog_as_select.bigquery_location" );
358+ } catch (Exception ignored ) {
359+ }
360+ }
316361 }
317362
318363 private static SparkSession createSparkSession () {
0 commit comments