Skip to content

Commit d0edb4d

Browse files
committed
Update options
1 parent 98540e4 commit d0edb4d

File tree

10 files changed

+144
-44
lines changed

10 files changed

+144
-44
lines changed

release.properties

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#release configuration
2+
#Wed Jun 21 19:46:01 CEST 2023
3+
projectVersionPolicyId=default
4+
scm.tagNameFormat=@{project.version}
5+
remoteTagging=true
6+
scm.commentPrefix=[maven-release-plugin]
7+
pushChanges=true
8+
completedPhase=check-poms
9+
scm.url=scm\:git\:git@github.com\:clun/beam-sdks-java-io-astra.git
10+
exec.snapshotReleasePluginAllowed=false
11+
preparationGoals=clean verify

src/main/java/org/apache/beam/sdk/io/astra/db/AstraDbIO.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -853,4 +853,8 @@ public PCollection<T> expand(PCollection<Read<T>> input) {
853853
}
854854
}
855855

856+
public static void close() {
857+
CqlSessionHolder.cleanup();
858+
}
859+
856860
}

src/main/java/org/apache/beam/sdk/io/astra/db/CqlSessionHolder.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,14 @@ private static synchronized void init() {
168168
throw new IllegalStateException("SHA-1 is not supported");
169169
}
170170
// Add a Shutdown Hook to close all sessions.
171-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
172-
LOG.debug("CqlSessionHolder closing sessions...");
173-
cacheSessions.values().stream()
174-
.filter(s->!s.isClosed())
175-
.forEach(CqlSession::close);
176-
LOG.debug("CqlSessionHolder sessions are closed.");
177-
}));
171+
Runtime.getRuntime().addShutdownHook(new Thread(() -> { cleanup(); }));
178172
}
179173
}
180174

175+
public static void cleanup() {
176+
cacheSessions.values().stream()
177+
.filter(s->!s.isClosed())
178+
.forEach(CqlSession::close);
179+
}
180+
181181
}

src/main/java/org/apache/beam/sdk/io/astra/db/ExecuteCqlFn.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222

2323
import com.datastax.oss.driver.api.core.CqlSession;
2424
import com.datastax.oss.driver.api.core.cql.Row;
25-
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
26-
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
2725
import org.apache.beam.sdk.transforms.DoFn;
2826

2927
/**

src/main/java/org/apache/beam/sdk/io/astra/db/ReadFn.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858

5959
import java.math.BigInteger;
6060
import java.util.Iterator;
61-
import java.util.concurrent.atomic.AtomicInteger;
6261
import java.util.stream.Collectors;
6362

6463
/**

src/main/java/org/apache/beam/sdk/io/astra/db/options/AstraDbDeleteOptions.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,4 @@
2323
/**
2424
* Specific keys for write operations.
2525
*/
26-
public interface AstraDbDeleteOptions extends AstraDbOptions {
27-
28-
// no consistency level as forced to LOCAL_QUORUM
29-
30-
// no read timeout
31-
}
26+
public interface AstraDbDeleteOptions extends AstraDbOptions {}

src/main/java/org/apache/beam/sdk/io/astra/db/options/AstraDbOptions.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
* Licensed under the Apache License, Version 2.0
1010
* You may not use this file except in compliance with the License.
1111
* You may obtain a copy of the License at
12-
*
12+
*
1313
* http://www.apache.org/licenses/LICENSE-2.0
14-
*
14+
*
1515
* Unless required by applicable law or agreed to in writing, software
1616
* distributed under the License is distributed on an "AS IS" BASIS,
1717
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,36 +46,41 @@ public interface AstraDbOptions extends PipelineOptions {
4646
void setAstraToken(String token);
4747

4848
/**
49-
* Access Astra secure bundle
49+
* Secure Connect Bundle location. It can be;
50+
* - a secret resourceId
51+
* - a path to a file
52+
* - a URL
53+
*
54+
* It will be converted as a byte array for Astra DbIO
5055
* @return the Astra secure bundle
5156
*/
5257
@Description("Location of secure connect bundle, depending on environment could be path or secret resource id")
5358
@Validation.Required
54-
byte[] getAstraSecureConnectBundle();
59+
String getAstraSecureConnectBundle();
5560

5661
/**
5762
* Update the Astra secure bundle
5863
*
59-
* @param path
64+
* @param secureConnectBundleLocation
6065
* new value for Astra connection timeout
6166
*/
6267
@SuppressWarnings("not used")
63-
void setAstraSecureConnectBundle(byte[] path);
68+
void setAstraSecureConnectBundle(String secureConnectBundleLocation);
6469

6570
/**
6671
* Access Astra Keyspace
6772
* @return the Astra keyspace
6873
*/
6974
@Description("Keyspace in Cassandra, a Db can have multiple keyspace")
7075
@Validation.Required
71-
String getKeyspace();
76+
String getAstraKeyspace();
7277

7378
/**
7479
* Update the Astra keyspace
7580
*
7681
* @param keyspace
7782
* new value for Astra keyspace
7883
*/
79-
void setKeyspace(String keyspace);
84+
void setAstraKeyspace(String keyspace);
8085

8186
}

src/main/java/org/apache/beam/sdk/io/astra/db/options/AstraDbWriteOptions.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,4 @@
2323
/**
2424
* Specific keys for write operations.
2525
*/
26-
public interface AstraDbWriteOptions extends AstraDbOptions {
27-
28-
// no consistency level as forced to LOCAL_QUORUM
29-
30-
// no read timeout
31-
}
26+
public interface AstraDbWriteOptions extends AstraDbOptions {}

src/main/java/org/apache/beam/sdk/io/astra/db/transforms/RunCqlQueryFn.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
* #L%
2121
*/
2222

23-
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
2423
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
25-
import org.apache.beam.sdk.io.astra.db.options.AstraDbOptions;
2624
import org.apache.beam.sdk.options.ValueProvider;
2725
import org.apache.beam.sdk.transforms.PTransform;
2826
import org.apache.beam.sdk.values.PCollection;
@@ -42,18 +40,6 @@ public class RunCqlQueryFn<T> extends PTransform<PCollection<T>, PCollection<T>>
4240
*/
4341
private static final Logger LOG = LoggerFactory.getLogger(CqlSessionHolder.class);
4442

45-
/**
46-
* Execute a CQL query
47-
*
48-
* @param options
49-
* pipeline Options
50-
* @param cql
51-
* cql command to execute
52-
*/
53-
public RunCqlQueryFn(AstraDbOptions options, String cql) {
54-
this(options.getAstraToken(), options.getAstraSecureConnectBundle(), options.getKeyspace(), cql);
55-
}
56-
5743
/**
5844
* Execute a CQL query
5945
*
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package org.apache.beam.sdk.io.astra.db.utils;
2+
3+
/*-
4+
* #%L
5+
* Beam SDK for Astra
6+
* --
7+
* Copyright (C) 2023 DataStax
8+
* --
9+
* Licensed under the Apache License, Version 2.0
10+
* You may not use this file except in compliance with the License.
11+
* You may obtain a copy of the License at
12+
*
13+
* http://www.apache.org/licenses/LICENSE-2.0
14+
*
15+
* Unless required by applicable law or agreed to in writing, software
16+
* distributed under the License is distributed on an "AS IS" BASIS,
17+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
* See the License for the specific language governing permissions and
19+
* limitations under the License.
20+
* #L%
21+
*/
22+
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.net.MalformedURLException;
27+
import java.net.URL;
28+
import java.nio.file.Files;
29+
import java.nio.file.Path;
30+
31+
/**
32+
* Utility class for Astra Secure Connect Bundle.
33+
*/
34+
public class AstraSecureConnectBundleUtils {
35+
36+
/**
37+
* Private constructor.
38+
*/
39+
private AstraSecureConnectBundleUtils() {}
40+
41+
/**
42+
* Read binary content from a file.
43+
*
44+
* @param filePath
45+
* file Path
46+
* @return
47+
* binary
48+
*/
49+
public static final byte[] loadFromFilePath(String filePath) {
50+
try {
51+
return Files.readAllBytes(Path.of(filePath));
52+
} catch(Exception e) {
53+
throw new RuntimeException("Unable to read file " + filePath, e);
54+
}
55+
}
56+
57+
/**
58+
* Download a file and get binary from URL.
59+
*
60+
* @param fileUrl
61+
* file URL
62+
* @return
63+
* binary content
64+
*/
65+
public static final byte[] loadFromURL(String fileUrl) {
66+
URL url = null;
67+
try {
68+
url = new URL(fileUrl);
69+
try (
70+
InputStream inputStream = url.openStream();
71+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
72+
byte[] buffer = new byte[4096];
73+
int bytesRead;
74+
while ((bytesRead = inputStream.read(buffer)) != -1) {
75+
outputStream.write(buffer, 0, bytesRead);
76+
}
77+
return outputStream.toByteArray();
78+
}
79+
} catch (MalformedURLException e) {
80+
throw new RuntimeException("Invalid URL " + fileUrl, e);
81+
} catch (IOException e) {
82+
throw new RuntimeException("Unable to read file " + fileUrl, e);
83+
}
84+
}
85+
86+
/**
87+
* Get binary content from an inputStream.
88+
*
89+
* @param inputStream
90+
* inputstream
91+
* @return
92+
* binary content
93+
*/
94+
public static final byte[] loadFromInputStream(InputStream inputStream) {
95+
try {
96+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
97+
byte[] buffer = new byte[4096];
98+
int bytesRead;
99+
while ((bytesRead = inputStream.read(buffer)) != -1) {
100+
outputStream.write(buffer, 0, bytesRead);
101+
}
102+
return outputStream.toByteArray();
103+
} catch (IOException e) {
104+
throw new RuntimeException("Unable to read input stream", e);
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)