Skip to content

Commit 93bd8cc

Browse files
committed
feat: Added keys source connector (Redis CDC)
1 parent 3ccfb09 commit 93bd8cc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2000
-1729
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright © 2021 Redis
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package com.redis.kafka.connect;
14+
15+
import org.apache.kafka.connect.connector.Task;
16+
17+
import com.redis.kafka.connect.source.AbstractRedisSourceConnector;
18+
import com.redis.kafka.connect.source.RedisKeysSourceConfigDef;
19+
import com.redis.kafka.connect.source.RedisKeysSourceTask;
20+
21+
public class RedisKeysSourceConnector extends AbstractRedisSourceConnector {
22+
23+
@Override
24+
public Class<? extends Task> taskClass() {
25+
return RedisKeysSourceTask.class;
26+
}
27+
28+
@Override
29+
public RedisKeysSourceConfigDef config() {
30+
return new RedisKeysSourceConfigDef();
31+
}
32+
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/RedisSinkConnector.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.apache.kafka.connect.connector.Task;
2424
import org.apache.kafka.connect.sink.SinkConnector;
2525

26-
import com.redis.kafka.connect.common.VersionProvider;
27-
import com.redis.kafka.connect.sink.RedisSinkConfig;
26+
import com.redis.kafka.connect.common.ManifestVersionProvider;
27+
import com.redis.kafka.connect.sink.RedisSinkConfigDef;
2828
import com.redis.kafka.connect.sink.RedisSinkTask;
2929

3030
public class RedisSinkConnector extends SinkConnector {
@@ -53,11 +53,11 @@ public void stop() {
5353

5454
@Override
5555
public ConfigDef config() {
56-
return new RedisSinkConfig.RedisSinkConfigDef();
56+
return new RedisSinkConfigDef();
5757
}
5858

5959
@Override
6060
public String version() {
61-
return VersionProvider.getVersion();
61+
return ManifestVersionProvider.getVersion();
6262
}
6363
}

core/redis-kafka-connect/src/main/java/com/redis/kafka/connect/RedisSourceConnector.java

Lines changed: 0 additions & 91 deletions
This file was deleted.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright © 2021 Redis
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*/
13+
package com.redis.kafka.connect;
14+
15+
import org.apache.kafka.connect.connector.Task;
16+
17+
import com.redis.kafka.connect.source.AbstractRedisSourceConnector;
18+
import com.redis.kafka.connect.source.RedisStreamSourceConfigDef;
19+
import com.redis.kafka.connect.source.RedisStreamSourceTask;
20+
21+
public class RedisStreamSourceConnector extends AbstractRedisSourceConnector {
22+
23+
@Override
24+
public Class<? extends Task> taskClass() {
25+
return RedisStreamSourceTask.class;
26+
}
27+
28+
@Override
29+
public RedisStreamSourceConfigDef config() {
30+
return new RedisStreamSourceConfigDef();
31+
}
32+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.redis.kafka.connect.common;
2+
3+
import java.io.IOException;
4+
import java.net.URL;
5+
import java.util.Enumeration;
6+
import java.util.Optional;
7+
import java.util.jar.Attributes;
8+
import java.util.jar.Manifest;
9+
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
/**
14+
* Returns version information from the jar file's {@code /META-INF/MANIFEST.MF}
15+
*/
16+
public class ManifestVersionProvider {
17+
18+
private static final Logger log = LoggerFactory.getLogger(ManifestVersionProvider.class);
19+
20+
private static final String IMPLEMENTATION_TITLE = "Implementation-Title";
21+
private static final String IMPLEMENTATION_VERSION = "Implementation-Version";
22+
private static final String IMPLEMENTATION_TITLE_VALUE = "redis-kafka-connect";
23+
private static final String UNKNOWN_VERSION = "N/A";
24+
private static final String MANIFEST = "META-INF/MANIFEST.MF";
25+
26+
public static String getVersion() {
27+
try {
28+
Enumeration<URL> resources = ManifestVersionProvider.class.getClassLoader().getResources(MANIFEST);
29+
while (resources.hasMoreElements()) {
30+
Optional<String> version = version(resources.nextElement());
31+
if (version.isPresent()) {
32+
return version.get();
33+
}
34+
}
35+
} catch (Exception e) {
36+
log.error("Could not get version from MANIFEST.MF", e);
37+
}
38+
return UNKNOWN_VERSION;
39+
}
40+
41+
private static Optional<String> version(URL url) throws IOException {
42+
Manifest manifest = new Manifest(url.openStream());
43+
if (isApplicableManifest(manifest)) {
44+
Attributes attr = manifest.getMainAttributes();
45+
return Optional.of(String.valueOf(get(attr, IMPLEMENTATION_VERSION)));
46+
}
47+
return Optional.empty();
48+
}
49+
50+
private static boolean isApplicableManifest(Manifest manifest) {
51+
Attributes attributes = manifest.getMainAttributes();
52+
return IMPLEMENTATION_TITLE_VALUE.equals(get(attributes, IMPLEMENTATION_TITLE));
53+
}
54+
55+
private static Object get(Attributes attributes, String key) {
56+
return attributes.get(new Attributes.Name(key));
57+
}
58+
}

0 commit comments

Comments
 (0)