2929import org .apache .spark .sql .RowFactory ;
3030import org .apache .spark .sql .SparkSession ;
3131import org .junit .After ;
32+ import org .junit .AfterClass ;
3233import org .junit .Before ;
34+ import org .junit .BeforeClass ;
3335import org .junit .ClassRule ;
3436import org .junit .Ignore ;
3537import org .junit .Test ;
@@ -41,7 +43,32 @@ public class CatalogIntegrationTestBase {
4143
4244 BigQuery bigquery = IntegrationTestUtils .getBigquery ();
4345
46+ protected static SparkSession spark ;
4447 private String testTable ;
48+ // 2. Initialize the SparkSession ONCE before all tests
49+ @ BeforeClass
50+ public static void setupSparkSession () {
51+ spark =
52+ SparkSession .builder ()
53+ .appName ("catalog test" )
54+ .master ("local[*]" )
55+ .config ("spark.sql.legacy.createHiveTableByDefault" , "false" )
56+ .config ("spark.sql.sources.default" , "bigquery" )
57+ .config ("spark.datasource.bigquery.writeMethod" , "direct" )
58+ .config ("spark.sql.defaultCatalog" , "bigquery" )
59+ .config ("spark.sql.catalog.bigquery" , "com.google.cloud.spark.bigquery.BigQueryCatalog" )
60+ .getOrCreate ();
61+ }
62+
63+ // 4. Stop the SparkSession ONCE after all tests are done
64+ // This fixes the local IllegalStateException (race condition)
65+ @ AfterClass
66+ public static void teardownSparkSession () {
67+ if (spark != null ) {
68+ spark .stop ();
69+ spark = null ;
70+ }
71+ }
4572
4673 @ Before
4774 public void renameTestTable () {
@@ -71,12 +98,10 @@ public void testCreateTableInCustomNamespace() throws Exception {
7198
7299 private void internalTestCreateTable (String dataset ) throws InterruptedException {
73100 assertThat (bigquery .getDataset (DatasetId .of (dataset ))).isNotNull ();
74- try (SparkSession spark = createSparkSession ()) {
75- spark .sql ("CREATE TABLE " + fullTableName (dataset ) + "(id int, data string);" );
76- Table table = bigquery .getTable (TableId .of (dataset , testTable ));
77- assertThat (table ).isNotNull ();
78- assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (0L );
79- }
101+ spark .sql ("CREATE TABLE " + fullTableName (dataset ) + "(id int, data string);" );
102+ Table table = bigquery .getTable (TableId .of (dataset , testTable ));
103+ assertThat (table ).isNotNull ();
104+ assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (0L );
80105 }
81106
82107 @ Test
@@ -91,13 +116,11 @@ public void testCreateTableAndInsertInCustomNamespace() throws Exception {
91116
92117 private void internalTestCreateTableAndInsert (String dataset ) throws InterruptedException {
93118 assertThat (bigquery .getDataset (DatasetId .of (dataset ))).isNotNull ();
94- try (SparkSession spark = createSparkSession ()) {
95- spark .sql ("CREATE TABLE " + fullTableName (dataset ) + "(id int, data string);" );
96- spark .sql (String .format ("INSERT INTO `%s`.`%s` VALUES (1, 'foo');" , dataset , testTable ));
97- Table table = bigquery .getTable (TableId .of (dataset , testTable ));
98- assertThat (table ).isNotNull ();
99- assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (1L );
100- }
119+ spark .sql ("CREATE TABLE " + fullTableName (dataset ) + "(id int, data string);" );
120+ spark .sql (String .format ("INSERT INTO `%s`.`%s` VALUES (1, 'foo');" , dataset , testTable ));
121+ Table table = bigquery .getTable (TableId .of (dataset , testTable ));
122+ assertThat (table ).isNotNull ();
123+ assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (1L );
101124 }
102125
103126 @ Test
@@ -112,12 +135,10 @@ public void testCreateTableAsSelectInCustomNamespace() throws Exception {
112135
113136 private void internalTestCreateTableAsSelect (String dataset ) throws InterruptedException {
114137 assertThat (bigquery .getDataset (DatasetId .of (dataset ))).isNotNull ();
115- try (SparkSession spark = createSparkSession ()) {
116- spark .sql ("CREATE TABLE " + fullTableName (dataset ) + " AS SELECT 1 AS id, 'foo' AS data;" );
117- Table table = bigquery .getTable (TableId .of (dataset , testTable ));
118- assertThat (table ).isNotNull ();
119- assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (1L );
120- }
138+ spark .sql ("CREATE TABLE " + fullTableName (dataset ) + " AS SELECT 1 AS id, 'foo' AS data;" );
139+ Table table = bigquery .getTable (TableId .of (dataset , testTable ));
140+ assertThat (table ).isNotNull ();
141+ assertThat (selectCountStarFrom (dataset , testTable )).isEqualTo (1L );
121142 }
122143
123144 @ Test
@@ -135,23 +156,21 @@ public void testCreateTableWithExplicitTargetInCustomNamespace() throws Exceptio
135156 private void internalTestCreateTableWithExplicitTarget (String dataset )
136157 throws InterruptedException {
137158 assertThat (bigquery .getDataset (DatasetId .of (dataset ))).isNotNull ();
138- try (SparkSession spark = createSparkSession ()) {
139- spark .sql (
140- "CREATE TABLE "
141- + fullTableName (dataset )
142- + " OPTIONS (table='bigquery-public-data.samples.shakespeare')" );
143- List <Row > result =
144- spark
145- .sql (
146- "SELECT word, SUM(word_count) FROM "
147- + fullTableName (dataset )
148- + " WHERE word='spark' GROUP BY word;" )
149- .collectAsList ();
150- assertThat (result ).hasSize (1 );
151- Row resultRow = result .get (0 );
152- assertThat (resultRow .getString (0 )).isEqualTo ("spark" );
153- assertThat (resultRow .getLong (1 )).isEqualTo (10L );
154- }
159+ spark .sql (
160+ "CREATE TABLE "
161+ + fullTableName (dataset )
162+ + " OPTIONS (table='bigquery-public-data.samples.shakespeare')" );
163+ List <Row > result =
164+ spark
165+ .sql (
166+ "SELECT word, SUM(word_count) FROM "
167+ + fullTableName (dataset )
168+ + " WHERE word='spark' GROUP BY word;" )
169+ .collectAsList ();
170+ assertThat (result ).hasSize (1 );
171+ Row resultRow = result .get (0 );
172+ assertThat (resultRow .getString (0 )).isEqualTo ("spark" );
173+ assertThat (resultRow .getLong (1 )).isEqualTo (10L );
155174 }
156175
157176 private String fullTableName (String dataset ) {
@@ -176,14 +195,11 @@ private long selectCountStarFrom(String dataset, String table) throws Interrupte
176195
177196 @ Test
178197 public void testReadFromDifferentBigQueryProject () throws Exception {
179- try (SparkSession spark = createSparkSession ()) {
180- List <Row > df =
181- spark
182- .sql (
183- "SELECT * from `bigquery-public-data`.`samples`.`shakespeare` WHERE word='spark'" )
184- .collectAsList ();
185- assertThat (df ).hasSize (9 );
186- }
198+ List <Row > df =
199+ spark
200+ .sql ("SELECT * from `bigquery-public-data`.`samples`.`shakespeare` WHERE word='spark'" )
201+ .collectAsList ();
202+ assertThat (df ).hasSize (9 );
187203 }
188204
189205 @ Test
@@ -192,45 +208,36 @@ public void testListNamespaces() throws Exception {
192208 String .format ("show_databases_test_%s_%s" , System .currentTimeMillis (), System .nanoTime ());
193209 DatasetId datasetId = DatasetId .of (database );
194210 bigquery .create (Dataset .newBuilder (datasetId ).build ());
195- try (SparkSession spark = createSparkSession ()) {
196- List <Row > databases = spark .sql ("SHOW DATABASES" ).collectAsList ();
197- assertThat (databases ).contains (RowFactory .create (database ));
198- } finally {
199- bigquery .delete (datasetId );
200- }
211+ List <Row > databases = spark .sql ("SHOW DATABASES" ).collectAsList ();
212+ assertThat (databases ).contains (RowFactory .create (database ));
213+ bigquery .delete (datasetId );
201214 }
202215
203216 @ Test
204217 public void testCreateNamespace () throws Exception {
205218 String database =
206219 String .format ("create_database_test_%s_%s" , System .currentTimeMillis (), System .nanoTime ());
207220 DatasetId datasetId = DatasetId .of (database );
208- try (SparkSession spark = createSparkSession ()) {
209- spark .sql ("CREATE DATABASE " + database + ";" );
210- Dataset dataset = bigquery .getDataset (datasetId );
211- assertThat (dataset ).isNotNull ();
212- } finally {
213- bigquery .delete (datasetId );
214- }
221+ spark .sql ("CREATE DATABASE " + database + ";" );
222+ Dataset dataset = bigquery .getDataset (datasetId );
223+ assertThat (dataset ).isNotNull ();
224+ bigquery .delete (datasetId );
215225 }
216226
217227 @ Test
218228 public void testCreateNamespaceWithLocation () throws Exception {
219229 String database =
220230 String .format ("create_database_test_%s_%s" , System .currentTimeMillis (), System .nanoTime ());
221231 DatasetId datasetId = DatasetId .of (database );
222- try (SparkSession spark = createSparkSession ()) {
223- spark .sql (
224- "CREATE DATABASE "
225- + database
226- + " COMMENT 'foo' WITH DBPROPERTIES (bigquery_location = 'us-east1');" );
227- Dataset dataset = bigquery .getDataset (datasetId );
228- assertThat (dataset ).isNotNull ();
229- assertThat (dataset .getLocation ()).isEqualTo ("us-east1" );
230- assertThat (dataset .getDescription ()).isEqualTo ("foo" );
231- } finally {
232- bigquery .delete (datasetId );
233- }
232+ spark .sql (
233+ "CREATE DATABASE "
234+ + database
235+ + " COMMENT 'foo' WITH DBPROPERTIES (bigquery_location = 'us-east1');" );
236+ Dataset dataset = bigquery .getDataset (datasetId );
237+ assertThat (dataset ).isNotNull ();
238+ assertThat (dataset .getLocation ()).isEqualTo ("us-east1" );
239+ assertThat (dataset .getDescription ()).isEqualTo ("foo" );
240+ bigquery .delete (datasetId );
234241 }
235242
236243 @ Test
@@ -239,85 +246,73 @@ public void testDropDatabase() {
239246 String .format ("drop_database_test_%s_%s" , System .currentTimeMillis (), System .nanoTime ());
240247 DatasetId datasetId = DatasetId .of (database );
241248 bigquery .create (Dataset .newBuilder (datasetId ).build ());
242- try (SparkSession spark = createSparkSession ()) {
243- spark .sql ("DROP DATABASE " + database + ";" );
244- Dataset dataset = bigquery .getDataset (datasetId );
245- assertThat (dataset ).isNull ();
246- }
249+ spark .sql ("DROP DATABASE " + database + ";" );
250+ Dataset dataset = bigquery .getDataset (datasetId );
251+ assertThat (dataset ).isNull ();
247252 }
248253
249254 @ Test
250255 public void testCatalogInitializationWithProject () {
251- try (SparkSession spark = createSparkSession ()) {
252- spark
253- .conf ()
254- .set (
255- "spark.sql.catalog.public_catalog" ,
256- "com.google.cloud.spark.bigquery.BigQueryCatalog" );
257- spark .conf ().set ("spark.sql.catalog.public_catalog.projectId" , "bigquery-public-data" );
258- List <Row > rows = spark .sql ("SHOW DATABASES IN public_catalog" ).collectAsList ();
259- List <String > databaseNames =
260- rows .stream ().map (row -> row .getString (0 )).collect (Collectors .toList ());
261- assertThat (databaseNames ).contains ("samples" );
262- System .out .println (databaseNames );
263- spark .sql ("SHOW TABLES IN public_catalog.samples" ).show ();
264- List <Row > data =
265- spark .sql ("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" ).collectAsList ();
266- assertThat (data ).hasSize (10 );
267- }
256+ spark
257+ .conf ()
258+ .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
259+ spark .conf ().set ("spark.sql.catalog.public_catalog.project" , "bigquery-public-data" );
260+
261+ List <Row > rows = spark .sql ("SHOW DATABASES IN public_catalog" ).collectAsList ();
262+ List <String > databaseNames =
263+ rows .stream ().map (row -> row .getString (0 )).collect (Collectors .toList ());
264+ assertThat (databaseNames ).contains ("samples" );
265+
266+ List <Row > data =
267+ spark .sql ("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" ).collectAsList ();
268+ assertThat (data ).hasSize (10 );
268269 }
269270
270271 @ Test
271- public void testCreateCatalogWithLocation () {
272+ public void testCreateCatalogWithLocation () throws Exception {
272273 String database = String .format ("create_db_with_location_%s" , System .nanoTime ());
273274 DatasetId datasetId = DatasetId .of (database );
274- try (SparkSession spark = createSparkSession ()) {
275- spark
276- .conf ()
277- .set ("spark.sql.catalog.test_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
278- spark .conf ().set ("spark.sql.catalog.test_catalog.location" , "EU" );
279-
280- spark .sql ("CREATE DATABASE test_catalog." + database );
281-
282- Dataset dataset = bigquery .getDataset (datasetId );
283- assertThat (dataset ).isNotNull ();
284- assertThat (dataset .getLocation ()).isEqualTo ("EU" );
285- } finally {
286- bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
287- }
275+ spark
276+ .conf ()
277+ .set (
278+ "spark.sql.catalog.test_location_catalog" ,
279+ "com.google.cloud.spark.bigquery.BigQueryCatalog" );
280+ spark .conf ().set ("spark.sql.catalog.test_location_catalog.bigquery_location" , "EU" );
281+ spark .sql ("CREATE DATABASE test_location_catalog." + database );
282+ Dataset dataset = bigquery .getDataset (datasetId );
283+ assertThat (dataset ).isNotNull ();
284+ assertThat (dataset .getLocation ()).isEqualTo ("EU" );
285+ bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
288286 }
289287
290288 @ Test
291289 public void testCreateTableAsSelectWithProjectAndLocation () {
292290 String database = String .format ("ctas_db_with_location_%s" , System .nanoTime ());
293291 String newTable = "ctas_table_from_public" ;
294292 DatasetId datasetId = DatasetId .of (database );
295- try (SparkSession spark = createSparkSession ()) {
296- spark
297- .conf ()
298- .set (
299- "spark.sql.catalog.public_catalog" ,
300- "com.google.cloud.spark.bigquery.BigQueryCatalog" );
301- spark .conf ().set ("spark.sql.catalog.public_catalog.projectId" , "bigquery-public-data" );
302- spark
303- .conf ()
304- .set ("spark.sql.catalog.test_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
305- spark .conf ().set ("spark.sql.catalog.test_catalog.location" , "EU" );
306- spark .sql ("CREATE DATABASE test_catalog." + database );
307- spark .sql (
308- "CREATE TABLE test_catalog."
309- + database
310- + "."
311- + newTable
312- + " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" );
313- Dataset dataset = bigquery .getDataset (datasetId );
314- assertThat (dataset ).isNotNull ();
315- assertThat (dataset .getLocation ()).isEqualTo ("EU" );
316- Table table = bigquery .getTable (TableId .of (datasetId .getDataset (), newTable ));
317- assertThat (table ).isNotNull ();
318- } finally {
319- bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
320- }
293+ spark
294+ .conf ()
295+ .set ("spark.sql.catalog.public_catalog" , "com.google.cloud.spark.bigquery.BigQueryCatalog" );
296+ spark .conf ().set ("spark.sql.catalog.public_catalog.projectId" , "bigquery-public-data" );
297+ spark
298+ .conf ()
299+ .set (
300+ "spark.sql.catalog.test_catalog_as_select" ,
301+ "com.google.cloud.spark.bigquery.BigQueryCatalog" );
302+ spark .conf ().set ("spark.sql.catalog.test_catalog_as_select.bigquery_location" , "EU" );
303+ spark .sql ("CREATE DATABASE test_catalog_as_select." + database );
304+ spark .sql (
305+ "CREATE TABLE test_catalog_as_select."
306+ + database
307+ + "."
308+ + newTable
309+ + " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10" );
310+ Dataset dataset = bigquery .getDataset (datasetId );
311+ assertThat (dataset ).isNotNull ();
312+ assertThat (dataset .getLocation ()).isEqualTo ("EU" );
313+ Table table = bigquery .getTable (TableId .of (datasetId .getDataset (), newTable ));
314+ assertThat (table ).isNotNull ();
315+ bigquery .delete (datasetId , BigQuery .DatasetDeleteOption .deleteContents ());
321316 }
322317
323318 private static SparkSession createSparkSession () {
0 commit comments