diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml
index 7b68e68..c6517b8 100644
--- a/iotdb-collector/collector-core/pom.xml
+++ b/iotdb-collector/collector-core/pom.xml
@@ -107,6 +107,10 @@
com.google.code.findbugs
jsr305
+
+ org.xerial
+ sqlite-jdbc
+
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
index 96ecdaf..650caee 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.collector.config.Configuration;
import org.apache.iotdb.collector.service.ApiService;
import org.apache.iotdb.collector.service.IService;
+import org.apache.iotdb.collector.service.PersistenceService;
import org.apache.iotdb.collector.service.RuntimeService;
import org.slf4j.Logger;
@@ -39,6 +40,7 @@ public class Application {
private Application() {
services.add(new RuntimeService());
services.add(new ApiService());
+ services.add(new PersistenceService());
}
public static void main(String[] args) {
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
index b64f017..041dde1 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java
@@ -20,11 +20,9 @@
package org.apache.iotdb.collector.api.v1.plugin.impl;
import org.apache.iotdb.collector.api.v1.plugin.PluginApiService;
-import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest;
-import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest;
+import org.apache.iotdb.collector.service.RuntimeService;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
@@ -34,30 +32,34 @@ public class PluginApiServiceImpl extends PluginApiService {
@Override
public Response createPlugin(
final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) {
- return Response.ok("create plugin").build();
- }
+ PluginApiServiceRequestValidationHandler.validateCreatePluginRequest(createPluginRequest);
- @Override
- public Response alterPlugin(
- final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) {
- return Response.ok("alter plugin").build();
+ return RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin()
+ .get()
+ .createPlugin(
+ createPluginRequest.getPluginName().toUpperCase(),
+ createPluginRequest.getClassName(),
+ createPluginRequest.getJarName(),
+ null,
+ true)
+ : Response.ok("create plugin").build();
}
@Override
- public Response startPlugin(
- final StartPluginRequest startPluginRequest, final SecurityContext securityContext) {
- return Response.ok("start plugin").build();
- }
+ public Response dropPlugin(
+ final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
+ PluginApiServiceRequestValidationHandler.validateDropPluginRequest(dropPluginRequest);
- @Override
- public Response stopPlugin(
- final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) {
- return Response.ok("stop plugin").build();
+ return RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin().get().dropPlugin(dropPluginRequest.getPluginName().toUpperCase())
+ : Response.ok("drop plugin").build();
}
@Override
- public Response dropPlugin(
- final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) {
- return Response.ok("drop plugin").build();
+ public Response showPlugin(final SecurityContext securityContext) {
+ return RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin().get().showPlugin()
+ : Response.ok("show plugin").build();
}
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
index 36adce3..6f7d983 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java
@@ -19,4 +19,21 @@
package org.apache.iotdb.collector.api.v1.plugin.impl;
-public class PluginApiServiceRequestValidationHandler {}
+import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest;
+import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest;
+
+import java.util.Objects;
+
+public class PluginApiServiceRequestValidationHandler {
+ private PluginApiServiceRequestValidationHandler() {}
+
+ public static void validateCreatePluginRequest(final CreatePluginRequest createPluginRequest) {
+ Objects.requireNonNull(createPluginRequest.getPluginName(), "plugin name cannot be null");
+ Objects.requireNonNull(createPluginRequest.getClassName(), "class name cannot be null");
+ Objects.requireNonNull(createPluginRequest.getJarName(), "jar name cannot be null");
+ }
+
+ public static void validateDropPluginRequest(final DropPluginRequest dropPluginRequest) {
+ Objects.requireNonNull(dropPluginRequest.getPluginName(), "plugin name cannot be null");
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
index 053b618..cd563da 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest;
import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest;
import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.service.RuntimeService;
import javax.ws.rs.core.Response;
@@ -42,9 +43,11 @@ public Response createTask(
.get()
.createTask(
createTaskRequest.getTaskId(),
+ TaskStateEnum.RUNNING,
createTaskRequest.getSourceAttribute(),
createTaskRequest.getProcessorAttribute(),
- createTaskRequest.getSinkAttribute())
+ createTaskRequest.getSinkAttribute(),
+ true)
: Response.serverError().entity("Task runtime is down").build();
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
index e067086..16fd814 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java
@@ -36,6 +36,7 @@ public class Options {
try {
Class.forName(ApiServiceOptions.class.getName());
Class.forName(TaskRuntimeOptions.class.getName());
+ Class.forName(PluginRuntimeOptions.class.getName());
} catch (final ClassNotFoundException e) {
throw new RuntimeException("Failed to load options", e);
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
new file mode 100644
index 0000000..92eb04b
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.config;
+
+import java.io.File;
+
+public class PluginRuntimeOptions extends Options {
+ public static final Option PLUGIN_LIB_DIR =
+ new Option("plugin_lib_dir", "ext" + File.separator + "plugin") {
+ @Override
+ public void setValue(final String valueString) {
+ value = valueString;
+ }
+ };
+
+ public static final Option PLUGIN_INSTALL_LIB_DIR =
+ new Option(
+ "plugin_install_lib_dir", PLUGIN_LIB_DIR.value() + File.separator + "install") {
+ @Override
+ public void setValue(final String valueString) {
+ value = valueString;
+ }
+ };
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
new file mode 100644
index 0000000..2e8c3bb
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+public class DBConstant {
+
+ public static final String CREATE_PLUGIN_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS plugin\n"
+ + "(\n"
+ + " plugin_name TEXT PRIMARY KEY,\n"
+ + " class_name TEXT NOT NULL,\n"
+ + " jar_name TEXT NOT NULL,\n"
+ + " jar_md5 TEXT NOT NULL,\n"
+ + " create_time TEXT NOT NULL\n"
+ + ");";
+ public static final String CREATE_TASK_TABLE_SQL =
+ "CREATE TABLE IF NOT EXISTS task\n"
+ + "(\n"
+ + " task_id TEXT PRIMARY KEY,\n"
+ + " task_state INT NOT NULL,\n"
+ + " source_attribute BLOB NOT NULL,\n"
+ + " processor_attribute BLOB NOT NULL,\n"
+ + " sink_attribute BLOB NOT NULL,\n"
+ + " create_time TEXT NOT NULL\n"
+ + ");";
+
+ public static final String PLUGIN_DATABASE_FILE_PATH = "ext/db/plugin.db";
+ public static final String TASK_DATABASE_FILE_PATH = "ext/db/task.db";
+
+ public static final String PLUGIN_DATABASE_URL = "jdbc:sqlite:" + PLUGIN_DATABASE_FILE_PATH;
+ public static final String TASK_DATABASE_URL = "jdbc:sqlite:" + TASK_DATABASE_FILE_PATH;
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
new file mode 100644
index 0000000..b2abcb8
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public abstract class Persistence {
+
+ private final String databaseUrl;
+
+ public Persistence(final String databaseUrl) {
+ this.databaseUrl = databaseUrl;
+ initDatabaseFileIfPossible();
+ initTableIfPossible();
+ }
+
+ protected abstract void initDatabaseFileIfPossible();
+
+ protected abstract void initTableIfPossible();
+
+ public abstract void tryResume();
+
+ protected Connection getConnection() throws SQLException {
+ return DriverManager.getConnection(databaseUrl);
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
new file mode 100644
index 0000000..d1d2b20
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.RuntimeService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Objects;
+
+public class PluginPersistence extends Persistence {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PluginPersistence.class);
+
+ public PluginPersistence(String databaseUrl) {
+ super(databaseUrl);
+ }
+
+ @Override
+ protected void initDatabaseFileIfPossible() {
+ try {
+ final Path pluginDatabaseFilePath = Paths.get(DBConstant.PLUGIN_DATABASE_FILE_PATH);
+ if (!Files.exists(pluginDatabaseFilePath)) {
+ Files.createFile(pluginDatabaseFilePath);
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create plugin database file", e);
+ }
+ }
+
+ @Override
+ protected void initTableIfPossible() {
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
+ connection.prepareStatement(DBConstant.CREATE_PLUGIN_TABLE_SQL);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to create plugin database", e);
+ }
+ }
+
+ @Override
+ public void tryResume() {
+ final String queryAllPluginSQL =
+ "SELECT plugin_name, class_name, jar_name, jar_md5 FROM plugin";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(queryAllPluginSQL);
+ final ResultSet pluginResultSet = statement.executeQuery();
+
+ while (pluginResultSet.next()) {
+ final String pluginName = pluginResultSet.getString("plugin_name");
+ final String className = pluginResultSet.getString("class_name");
+ final String jarName = pluginResultSet.getString("jar_name");
+ final String jarMd5 = pluginResultSet.getString("jar_md5");
+
+ if (!isPluginJarFileWithMD5NameExists(pluginName, jarName, jarMd5)) {
+ tryDeletePlugin(pluginName);
+ continue;
+ }
+
+ tryRecoverPlugin(pluginName, className, jarName, jarMd5);
+ }
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to resume plugin persistence message, because {}", e.getMessage());
+ }
+ }
+
+ private boolean isPluginJarFileWithMD5NameExists(
+ final String pluginName, final String jarName, final String jarMd5) {
+ final Path pluginJarFileWithMD5Path =
+ Paths.get(
+ PluginFileUtils.getPluginJarFileWithMD5FilePath(
+ pluginName, PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMd5)));
+
+ return Files.exists(pluginJarFileWithMD5Path);
+ }
+
+ private void tryRecoverPlugin(
+ final String pluginName, final String className, final String jarName, final String jarMD5) {
+ final Response response =
+ RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin()
+ .get()
+ .createPlugin(pluginName, className, jarName, jarMD5, false)
+ : null;
+
+ if (Objects.isNull(response) || response.getStatus() != Response.Status.OK.getStatusCode()) {
+ LOGGER.warn("Failed to recover plugin message from plugin {}: {}", pluginName, response);
+ }
+ }
+
+ public void tryPersistencePlugin(
+ final String pluginName, final String className, final String jarName, final String jarMD5) {
+ final String sql =
+ "INSERT INTO plugin(plugin_name, class_name, jar_name, jar_md5, create_time) VALUES(?,?,?,?,?)";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, pluginName);
+ statement.setString(2, className);
+ statement.setString(3, jarName);
+ statement.setString(4, jarMD5);
+ statement.setString(5, String.valueOf(new Timestamp(System.currentTimeMillis())));
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to persistence plugin message, because {}", e.getMessage());
+ }
+ }
+
+ public void tryDeletePlugin(final String pluginName) {
+ final String sql = "DELETE FROM plugin WHERE plugin_name = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(sql);
+ statement.setString(1, pluginName);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to delete plugin persistence message, because {}", e.getMessage());
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
new file mode 100644
index 0000000..bf62519
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.persistence;
+
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
+import org.apache.iotdb.collector.service.RuntimeService;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskPersistence extends Persistence {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TaskPersistence.class);
+
+ public TaskPersistence(String databaseUrl) {
+ super(databaseUrl);
+ }
+
+ @Override
+ protected void initDatabaseFileIfPossible() {
+ try {
+ final Path taskDatabaseFilePath = Paths.get(DBConstant.TASK_DATABASE_FILE_PATH);
+ if (!Files.exists(taskDatabaseFilePath)) {
+ Files.createFile(taskDatabaseFilePath);
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create task database file", e);
+ }
+ }
+
+ @Override
+ protected void initTableIfPossible() {
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement =
+ connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to create task database", e);
+ }
+ }
+
+ @Override
+ public void tryResume() {
+ final String queryAllTaskSQL =
+ "SELECT task_id, task_state, source_attribute, processor_attribute, sink_attribute, create_time FROM task";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL);
+ final ResultSet taskResultSet = statement.executeQuery();
+
+ while (taskResultSet.next()) {
+ final String taskId = taskResultSet.getString(1);
+ final TaskStateEnum taskState = TaskStateEnum.values()[taskResultSet.getInt(2)];
+ final byte[] sourceAttribute = taskResultSet.getBytes(3);
+ final byte[] processorAttribute = taskResultSet.getBytes(4);
+ final byte[] sinkAttribute = taskResultSet.getBytes(5);
+
+ tryRecoverTask(
+ taskId,
+ taskState,
+ deserialize(sourceAttribute),
+ deserialize(processorAttribute),
+ deserialize(sinkAttribute));
+ }
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to resume task persistence message, because {}", e.getMessage());
+ }
+ }
+
+ public void tryRecoverTask(
+ final String taskId,
+ final TaskStateEnum taskState,
+ final Map sourceAttribute,
+ final Map processorAttribute,
+ final Map sinkAttribute) {
+ final Response response =
+ RuntimeService.task().isPresent()
+ ? RuntimeService.task()
+ .get()
+ .createTask(
+ taskId, taskState, sourceAttribute, processorAttribute, sinkAttribute, false)
+ : null;
+
+ if (Objects.isNull(response) || response.getStatus() != Response.Status.OK.getStatusCode()) {
+ LOGGER.warn("Failed to recover task persistence message, because {}", response);
+ }
+ }
+
+ private Map deserialize(final byte[] buffer) {
+ final Map attribute = new HashMap<>();
+ final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer);
+
+ final int size = ReadWriteIOUtils.readInt(attributeBuffer);
+ for (int i = 0; i < size; i++) {
+ final String key = ReadWriteIOUtils.readString(attributeBuffer);
+ final String value = ReadWriteIOUtils.readString(attributeBuffer);
+
+ attribute.put(key, value);
+ }
+
+ return attribute;
+ }
+
+ public void tryPersistenceTask(
+ final String taskId,
+ final TaskStateEnum taskState,
+ final Map sourceAttribute,
+ final Map processorAttribute,
+ final Map sinkAttribute) {
+ final String insertSQL =
+ "INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(insertSQL);
+
+ final byte[] sourceAttributeBuffer = serialize(sourceAttribute);
+ final byte[] processorAttributeBuffer = serialize(processorAttribute);
+ final byte[] sinkAttributeBuffer = serialize(sinkAttribute);
+
+ statement.setString(1, taskId);
+ statement.setInt(2, taskState.getTaskState());
+ statement.setBytes(3, sourceAttributeBuffer);
+ statement.setBytes(4, processorAttributeBuffer);
+ statement.setBytes(5, sinkAttributeBuffer);
+ statement.setString(6, String.valueOf(new Timestamp(System.currentTimeMillis())));
+ statement.executeUpdate();
+ } catch (final SQLException | IOException e) {
+ LOGGER.warn("Failed to persistence task message, because {}", e.getMessage());
+ }
+ }
+
+ private byte[] serialize(final Map attribute) throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
+ ReadWriteIOUtils.write(attribute.size(), outputStream);
+ for (final Map.Entry entry : attribute.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue(), outputStream);
+ }
+
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())
+ .array();
+ }
+ }
+
+ public void tryDeleteTask(final String taskId) {
+ final String deleteSQL = "DELETE FROM task WHERE task_id = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(deleteSQL);
+ statement.setString(1, taskId);
+ statement.executeUpdate();
+ } catch (final SQLException e) {
+ LOGGER.warn("Failed to delete task persistence message, because {}", e.getMessage());
+ }
+ }
+
+ public void tryAlterTaskState(final String taskId, final TaskStateEnum taskState) {
+ final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?";
+
+ try (final Connection connection = getConnection()) {
+ final PreparedStatement statement = connection.prepareStatement(alterSQL);
+ statement.setInt(1, taskState.getTaskState());
+ statement.setString(2, taskId);
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ LOGGER.warn("Failed to alter task persistence message, because {}", e.getMessage());
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
new file mode 100644
index 0000000..32ea664
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.plugin.api.customizer;
+
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import java.util.Map;
+
+public class CollectorParameters extends PipeParameters {
+ public CollectorParameters(final Map attributes) {
+ super(attributes);
+ this.attributes.forEach(
+ (key, value) -> {
+ if (!"taskId".equals(key)) {
+ attributes.put(key, value.replace("_", "-"));
+ }
+ });
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
index c8bfd93..afb8d15 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java
@@ -56,7 +56,7 @@ public void start() throws Exception {}
@Override
public Event supply() {
- LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(100));
+ LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000)));
LOGGER.info("{} created successfully ...", event);
return event;
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
index 233335c..fa81fc2 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java
@@ -66,7 +66,7 @@ private void doWork() {
final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000)));
LOGGER.info("{} created successfully ...", event);
supply(event);
- TimeUnit.SECONDS.sleep(100);
+ TimeUnit.SECONDS.sleep(2);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
index adbad7d..627ace6 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java
@@ -22,24 +22,46 @@
import org.apache.iotdb.collector.runtime.plugin.constructor.ProcessorConstructor;
import org.apache.iotdb.collector.runtime.plugin.constructor.SinkConstructor;
import org.apache.iotdb.collector.runtime.plugin.constructor.SourceConstructor;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoader;
+import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoaderManager;
+import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils;
+import org.apache.iotdb.collector.service.PersistenceService;
+import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.PipeSource;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Objects;
+
public class PluginRuntime implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PluginRuntime.class);
+
private final PluginMetaKeeper metaKeeper;
private final SourceConstructor sourceConstructor;
private final ProcessorConstructor processorConstructor;
private final SinkConstructor sinkConstructor;
+ private final PluginClassLoaderManager classLoaderManager;
public PluginRuntime() {
this.metaKeeper = new PluginMetaKeeper();
this.sourceConstructor = new SourceConstructor(metaKeeper);
this.processorConstructor = new ProcessorConstructor(metaKeeper);
this.sinkConstructor = new SinkConstructor(metaKeeper);
+ this.classLoaderManager = new PluginClassLoaderManager();
}
public PipeSource constructSource(final PipeParameters sourceParameters) {
@@ -66,24 +88,139 @@ public PipeSink constructSink(final PipeParameters sinkParameters) {
return sinkConstructor.reflectPlugin(sinkParameters);
}
- public boolean createPlugin() {
- return true;
+ public PluginClassLoader getClassLoader(final String pluginName) throws IOException {
+ return classLoaderManager.getPluginClassLoader(pluginName);
}
- public boolean alterPlugin() {
- return true;
- }
-
- public boolean startPlugin() {
- return true;
+ public synchronized Response createPlugin(
+ final String pluginName,
+ final String className,
+ final String jarName,
+ final String jarMD5FromDB,
+ final boolean isRestRequest) {
+ try {
+ // validate whether the plugin jar file exists
+ if (isRestRequest && !PluginFileUtils.isPluginJarFileExist(jarName)) {
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the plugin jar file %s is not found",
+ pluginName, jarName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // validate whether the plugin has been loaded
+ final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+ if (Objects.nonNull(information)) {
+ // validate whether the plugin is builtin plugin
+ if (information.isBuiltin()) {
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the given Plugin name is the same as a built-in Plugin name.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // otherwise the plugin has been registered
+ final String errorMessage =
+ String.format(
+ "Failed to register Plugin %s, because the Plugin has been registered.",
+ pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // get the plugin jar md5
+ final String jarMD5 =
+ jarMD5FromDB == null
+ ? DigestUtils.md5Hex(
+ Files.newInputStream(Paths.get(PluginFileUtils.getPluginJarFilePath(jarName))))
+ : jarMD5FromDB;
+
+ // If the {pluginName} directory already exists, delete the directory and the files under it,
+ // recreate the directory, and move the files to the new directory. If an exception occurs in
+ // the middle, delete the created directory.
+ final String pluginJarFileNameWithMD5 =
+ PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMD5);
+ PluginFileUtils.savePluginToInstallDir(pluginName, jarName, pluginJarFileNameWithMD5);
+
+ // create and save plugin class loader
+ final PluginClassLoader classLoader =
+ classLoaderManager.createPluginClassLoader(
+ PluginFileUtils.getPluginInstallDirPath(pluginName));
+
+ final Class> pluginClass = Class.forName(className, true, classLoader);
+ @SuppressWarnings("unused") // ensure that it is a PipePlugin class
+ final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
+
+ classLoaderManager.addPluginClassLoader(pluginName, classLoader);
+ metaKeeper.addPipePluginMeta(
+ pluginName, new PluginMeta(pluginName, className, false, jarName, jarMD5));
+ metaKeeper.addJarNameAndMd5(jarName, jarMD5);
+
+ // storage registered plugin info
+ if (isRestRequest) {
+ PersistenceService.plugin()
+ .ifPresent(
+ pluginPersistence ->
+ pluginPersistence.tryPersistencePlugin(pluginName, className, jarName, jarMD5));
+ }
+
+ final String successMessage = String.format("Successfully register Plugin %s", pluginName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (final Exception e) {
+ final String errorMessage =
+ String.format("Failed to register Plugin %s, because %s", pluginName, e);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
}
- public boolean stopPlugin() {
- return true;
+ public synchronized Response dropPlugin(final String pluginName) {
+ try {
+ final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName);
+ if (Objects.nonNull(information) && information.isBuiltin()) {
+ final String errorMessage =
+ String.format("Failed to deregister builtin Plugin %s.", pluginName);
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
+
+ // if it is needed to delete jar file of the plugin, delete both jar file and md5
+ final String installedFileName =
+ FilenameUtils.getBaseName(information.getJarName())
+ + "-"
+ + information.getJarMD5()
+ + "."
+ + FilenameUtils.getExtension(information.getJarName());
+ PluginFileUtils.removePluginFileUnderLibRoot(information.getPluginName(), installedFileName);
+
+ // remove anyway
+ metaKeeper.removeJarNameAndMd5IfPossible(pluginName);
+ metaKeeper.removePipePluginMeta(pluginName);
+ classLoaderManager.removePluginClassLoader(pluginName);
+
+ // remove plugin info from sqlite
+ PersistenceService.plugin()
+ .ifPresent(pluginPersistence -> pluginPersistence.tryDeletePlugin(pluginName));
+
+ final String successMessage = String.format("Successfully deregister Plugin %s", pluginName);
+ LOGGER.info(successMessage);
+ return Response.ok().entity(successMessage).build();
+ } catch (final IOException e) {
+ final String errorMessage =
+ String.format(
+ "Failed to deregister builtin Plugin %s, because %s", pluginName, e.getMessage());
+ LOGGER.warn(errorMessage);
+ return Response.serverError().entity(errorMessage).build();
+ }
}
- public boolean dropPlugin() {
- return true;
+ public Response showPlugin() {
+ final Iterable pluginMetas = metaKeeper.getAllPipePluginMeta();
+ return Response.ok().entity(pluginMetas).build();
}
@Override
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
index c64ef39..72bccb9 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta;
import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper;
+import org.apache.iotdb.collector.service.RuntimeService;
import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;
@@ -28,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
@@ -81,13 +81,14 @@ private PipePlugin reflect(String pluginName) {
final Class> pluginClass =
information.isBuiltin()
? pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName())
- : Class.forName(information.getClassName()); // TODO
+ : Class.forName(
+ information.getClassName(),
+ true,
+ RuntimeService.plugin().isPresent()
+ ? RuntimeService.plugin().get().getClassLoader(pluginName)
+ : null);
return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance();
- } catch (InstantiationException
- | InvocationTargetException
- | NoSuchMethodException
- | IllegalAccessException
- | ClassNotFoundException e) {
+ } catch (final Exception e) {
String errorMessage =
String.format(
"Failed to reflect PipePlugin %s(%s) instance, because %s",
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
index 066cf08..f5dd6be 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java
@@ -38,7 +38,7 @@ protected void initConstructors() {
@Override
public final PipeSink reflectPlugin(PipeParameters sinkParameters) {
- if (sinkParameters.hasAttribute("sink")) {
+ if (!sinkParameters.hasAttribute("sink")) {
throw new IllegalArgumentException("sink attribute is required");
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
index f4c2cd0..2899862 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java
@@ -42,7 +42,7 @@ protected void initConstructors() {
@Override
public final PipeSource reflectPlugin(PipeParameters sourceParameters) {
- if (sourceParameters.hasAttribute("source")) {
+ if (!sourceParameters.hasAttribute("source")) {
throw new IllegalArgumentException("source attribute is required");
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
new file mode 100644
index 0000000..7e93a1b
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@ThreadSafe
+public class PluginClassLoader extends URLClassLoader {
+
+ private final String libRoot;
+ private volatile boolean deprecated;
+
+ public PluginClassLoader(final String libRoot) throws IOException {
+ super(new URL[0]);
+ this.libRoot = libRoot;
+ this.deprecated = false;
+ addUrls();
+ }
+
+ private void addUrls() throws IOException {
+ try (final Stream pathStream = Files.walk(new File(libRoot).toPath())) {
+ for (final Path path :
+ pathStream.filter(path -> !path.toFile().isDirectory()).collect(Collectors.toList())) {
+ super.addURL(path.toUri().toURL());
+ }
+ }
+ }
+
+ public synchronized void markAsDeprecated() throws IOException {
+ deprecated = true;
+ closeIfPossible();
+ }
+
+ public void closeIfPossible() throws IOException {
+ if (deprecated) {
+ close();
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
new file mode 100644
index 0000000..c60a868
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.load;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@NotThreadSafe
+public class PluginClassLoaderManager {
+
+ private final Map pluginNameToClassLoaderMap;
+
+ public PluginClassLoaderManager() {
+ this.pluginNameToClassLoaderMap = new ConcurrentHashMap<>();
+ }
+
+ public void removePluginClassLoader(final String pluginName) throws IOException {
+ final PluginClassLoader classLoader = pluginNameToClassLoaderMap.remove(pluginName);
+ if (classLoader != null) {
+ classLoader.markAsDeprecated();
+ }
+ }
+
+ public PluginClassLoader getPluginClassLoader(final String pluginName) throws IOException {
+ return pluginNameToClassLoaderMap.get(pluginName.toUpperCase());
+ }
+
+ public void addPluginClassLoader(final String pluginName, final PluginClassLoader classLoader) {
+ pluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader);
+ }
+
+ public PluginClassLoader createPluginClassLoader(final String pluginDirPath) throws IOException {
+ return new PluginClassLoader(pluginDirPath);
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
index 4d9ddfd..b6b1f90 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java
@@ -19,30 +19,32 @@
package org.apache.iotdb.collector.runtime.plugin.meta;
+import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin;
+
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class PluginMetaKeeper {
- protected final Map pipePluginNameToMetaMap = new ConcurrentHashMap<>();
- protected final Map> builtinPipePluginNameToClassMap = new ConcurrentHashMap<>();
+ private final Map pipePluginNameToMetaMap = new ConcurrentHashMap<>();
+ private final Map> builtinPipePluginNameToClassMap = new ConcurrentHashMap<>();
+ private final Map jarNameToMd5Map = new ConcurrentHashMap<>();
+ private final Map jarNameToReferenceCountMap = new ConcurrentHashMap<>();
public PluginMetaKeeper() {
loadBuiltinPlugins();
}
private void loadBuiltinPlugins() {
- // for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) {
- // final String pipePluginName = builtinPipePlugin.getPipePluginName();
- // final Class> pipePluginClass = builtinPipePlugin.getPipePluginClass();
- // final String className = builtinPipePlugin.getClassName();
- //
- // addPipePluginMeta(pipePluginName, new PluginMeta(pipePluginName, className));
- // addBuiltinPluginClass(pipePluginName, pipePluginClass);
- // addPipePluginVisibility(
- // pipePluginName, VisibilityUtils.calculateFromPluginClass(pipePluginClass));
- // }
+ for (final BuiltinPlugin builtinPlugin : BuiltinPlugin.values()) {
+ final String pluginName = builtinPlugin.getPluginName();
+ final Class> pluginClass = builtinPlugin.getPluginClass();
+ final String className = builtinPlugin.getClassName();
+
+ addPipePluginMeta(pluginName, new PluginMeta(pluginName, className));
+ addBuiltinPluginClass(pluginName, pluginClass);
+ }
}
public void addPipePluginMeta(String pluginName, PluginMeta pluginMeta) {
@@ -82,6 +84,35 @@ public String getPluginNameByJarName(String jarName) {
return null;
}
+ public boolean containsJar(final String jarName) {
+ return jarNameToMd5Map.containsKey(jarName);
+ }
+
+ public boolean jarNameExistsAndMatchesMd5(final String jarName, final String md5) {
+ return jarNameToMd5Map.containsKey(jarName) && jarNameToMd5Map.get(jarName).equals(md5);
+ }
+
+ public void addJarNameAndMd5(final String jarName, final String md5) {
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ jarNameToReferenceCountMap.put(jarName, jarNameToReferenceCountMap.get(jarName) + 1);
+ } else {
+ jarNameToReferenceCountMap.put(jarName, 1);
+ jarNameToMd5Map.put(jarName, md5);
+ }
+ }
+
+ public void removeJarNameAndMd5IfPossible(final String jarName) {
+ if (jarNameToReferenceCountMap.containsKey(jarName)) {
+ final int count = jarNameToReferenceCountMap.get(jarName);
+ if (count == 1) {
+ jarNameToReferenceCountMap.remove(jarName);
+ jarNameToMd5Map.remove(jarName);
+ } else {
+ jarNameToReferenceCountMap.put(jarName, count - 1);
+ }
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
new file mode 100644
index 0000000..ebc693d
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.plugin.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+
+import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_LIB_DIR;
+
+public class PluginFileUtils {
+
+ public static void savePluginToInstallDir(
+ final String pluginName, final String jarName, final String jarNameWithMD5)
+ throws IOException {
+ final Path pluginInstallPath =
+ Paths.get(PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName);
+ final Path pluginJarInstallPath =
+ Paths.get(getPluginJarFileWithMD5FilePath(pluginName, jarNameWithMD5));
+
+ if (!Files.exists(pluginInstallPath)) {
+ FileUtils.forceMkdir(pluginInstallPath.toFile());
+ }
+ if (Files.exists(pluginJarInstallPath)) {
+ return;
+ }
+
+ FileUtils.moveFile(
+ new File(getPluginJarFilePath(jarName)),
+ pluginJarInstallPath.toFile(),
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ public static boolean isPluginJarFileExist(final String jarName) {
+ return Files.exists(Paths.get(getPluginJarFilePath(jarName)));
+ }
+
+ public static String getPluginJarFilePath(final String jarName) {
+ return PLUGIN_LIB_DIR.value() + File.separator + jarName;
+ }
+
+ public static String getPluginJarFileWithMD5FilePath(
+ final String pluginName, final String jarNameWithMD5) {
+ return getPluginInstallDirPath(pluginName) + File.separator + jarNameWithMD5;
+ }
+
+ public static String getPluginInstallDirPath(final String pluginName) {
+ return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName;
+ }
+
+ public static String getPluginJarFileNameWithMD5(final String jarName, final String jarMD5) {
+ return FilenameUtils.getBaseName(jarName)
+ + "-"
+ + jarMD5
+ + "."
+ + FilenameUtils.getExtension(jarName);
+ }
+
+ public static void removePluginFileUnderLibRoot(final String pluginName, final String fileName)
+ throws IOException {
+ final Path pluginDirPath = Paths.get(getPluginInstallFilePath(pluginName, fileName));
+
+ Files.deleteIfExists(pluginDirPath);
+ Files.deleteIfExists(pluginDirPath.getParent());
+ }
+
+ public static String getPluginInstallFilePath(final String pluginName, final String fileName) {
+ return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName + File.separator + fileName;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
index ac270ce..2dbcc71 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java
@@ -19,22 +19,18 @@
package org.apache.iotdb.collector.runtime.task;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.LockSupport;
public abstract class Task {
protected final String taskId;
- protected final PipeParameters parameters;
+ protected final CollectorParameters parameters;
protected final int parallelism;
- private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
- protected final AtomicBoolean isRunning = new AtomicBoolean(false);
- protected final AtomicBoolean isDropped = new AtomicBoolean(false);
+ protected final TaskDispatch dispatch;
protected Task(
final String taskId,
@@ -42,49 +38,36 @@ protected Task(
final String parallelismKey,
final int parallelismValue) {
this.taskId = taskId;
- this.parameters = new PipeParameters(attributes);
+ this.parameters = new CollectorParameters(attributes);
this.parallelism = parameters.getIntOrDefault(parallelismKey, parallelismValue);
- }
-
- public void resume() {
- isRunning.set(true);
- }
-
- public void pause() {
- isRunning.set(false);
- }
- protected void waitUntilRunningOrDropped() {
- while (!isRunning.get() && !isDropped.get()) {
- LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
- }
+ this.dispatch = new TaskDispatch();
}
public final synchronized void create() throws Exception {
- resume();
+ dispatch.resume();
createInternal();
}
public abstract void createInternal() throws Exception;
public final synchronized void start() throws Exception {
- resume();
+ dispatch.resume();
startInternal();
}
public abstract void startInternal() throws Exception;
public final synchronized void stop() throws Exception {
- pause();
+ dispatch.pause();
stopInternal();
}
public abstract void stopInternal() throws Exception;
public final synchronized void drop() throws Exception {
- pause();
- isDropped.set(true);
+ dispatch.remove();
dropInternal();
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
new file mode 100644
index 0000000..9a904c0
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.runtime.task;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+public class TaskDispatch {
+
+ private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L;
+ private final AtomicBoolean isRunning = new AtomicBoolean(false);
+ private final AtomicBoolean isDropped = new AtomicBoolean(false);
+
+ public void resume() {
+ isRunning.set(true);
+ }
+
+ public void pause() {
+ isRunning.set(false);
+ }
+
+ public void remove() {
+ pause();
+ isDropped.set(true);
+ }
+
+ public boolean isRunning() {
+ return isRunning.get();
+ }
+
+ public void waitUntilRunningOrDropped() {
+ while (!isRunning.get() && !isDropped.get()) {
+ LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS);
+ }
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
index ec3a4f1..f96a40f 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask;
import org.apache.iotdb.collector.runtime.task.sink.SinkTask;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
+import org.apache.iotdb.collector.service.PersistenceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +30,7 @@
import javax.ws.rs.core.Response;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
public class TaskRuntime implements AutoCloseable {
@@ -39,9 +41,11 @@ public class TaskRuntime implements AutoCloseable {
public synchronized Response createTask(
final String taskId,
+ final TaskStateEnum taskState,
final Map sourceAttribute,
final Map processorAttribute,
- final Map sinkAttribute) {
+ final Map sinkAttribute,
+ final boolean isRestRequest) {
try {
if (tasks.containsKey(taskId)) {
return Response.status(Response.Status.CONFLICT)
@@ -53,14 +57,28 @@ public synchronized Response createTask(
final ProcessorTask processorTask =
new ProcessorTask(taskId, processorAttribute, sinkTask.makeProducer());
final SourceTask sourceTask =
- SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer());
+ SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer(), taskState);
final TaskCombiner taskCombiner = new TaskCombiner(sourceTask, processorTask, sinkTask);
- tasks.put(taskId, taskCombiner);
taskCombiner.create();
+ tasks.put(taskId, taskCombiner);
+
+ // storage task info to sqlite
+ if (isRestRequest) {
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence ->
+ taskPersistence.tryPersistenceTask(
+ taskId,
+ TaskStateEnum.RUNNING,
+ sourceAttribute,
+ processorAttribute,
+ sinkAttribute));
+ }
+
LOGGER.info("Successfully created task {}", taskId);
- return Response.status(Response.Status.CREATED)
+ return Response.status(Response.Status.OK)
.entity(String.format("Successfully created task %s", taskId))
.build();
} catch (final Exception e) {
@@ -81,6 +99,10 @@ public synchronized Response startTask(final String taskId) {
try {
tasks.get(taskId).start();
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence -> taskPersistence.tryAlterTaskState(taskId, TaskStateEnum.RUNNING));
+
LOGGER.info("Task {} start successfully", taskId);
return Response.status(Response.Status.OK)
.entity(String.format("task %s start successfully", taskId))
@@ -103,6 +125,10 @@ public synchronized Response stopTask(final String taskId) {
try {
tasks.get(taskId).stop();
+ PersistenceService.task()
+ .ifPresent(
+ taskPersistence -> taskPersistence.tryAlterTaskState(taskId, TaskStateEnum.STOPPED));
+
LOGGER.info("Task {} stop successfully", taskId);
return Response.status(Response.Status.OK)
.entity(String.format("task %s stop successfully", taskId))
@@ -116,14 +142,19 @@ public synchronized Response stopTask(final String taskId) {
}
public synchronized Response dropTask(final String taskId) {
- if (!tasks.containsKey(taskId)) {
+ if (Objects.isNull(tasks.get(taskId)) || !tasks.containsKey(taskId)) {
return Response.status(Response.Status.NOT_FOUND)
.entity(String.format("task %s not found", taskId))
.build();
}
try {
- tasks.remove(taskId).drop();
+ final TaskCombiner task = tasks.get(taskId);
+ task.drop();
+ tasks.remove(taskId);
+
+ // remove task info from sqlite
+ PersistenceService.task().ifPresent(taskPersistence -> taskPersistence.tryDeleteTask(taskId));
LOGGER.info("Task {} drop successfully", taskId);
return Response.status(Response.Status.OK)
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
new file mode 100644
index 0000000..159016d
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.collector.runtime.task;
+
+public enum TaskStateEnum {
+ RUNNING(0),
+ STOPPED(1);
+
+ private final int taskState;
+
+ TaskStateEnum(final int taskState) {
+ this.taskState = taskState;
+ }
+
+ public int getTaskState() {
+ return taskState;
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
index c153506..9990871 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.collector.runtime.task.processor;
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.collector.EventCollector;
@@ -33,6 +34,8 @@ class ProcessorConsumer implements WorkHandler {
private final PipeProcessor processor;
private final EventCollector eventCollector;
+ private TaskDispatch dispatch;
+
ProcessorConsumer(final PipeProcessor processor, final EventCollector eventCollector) {
this.processor = processor;
this.eventCollector = eventCollector;
@@ -44,6 +47,8 @@ PipeProcessor consumer() {
@Override
public void onEvent(final EventContainer eventContainer) throws Exception {
+ dispatch.waitUntilRunningOrDropped();
+
// TODO: retry strategy
final Event event = eventContainer.getEvent();
if (event instanceof TabletInsertionEvent) {
@@ -54,4 +59,8 @@ public void onEvent(final EventContainer eventContainer) throws Exception {
processor.process(event, eventCollector);
}
}
+
+ public void setDispatch(final TaskDispatch dispatch) {
+ this.dispatch = dispatch;
+ }
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
index e671ea0..37fbf4e 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java
@@ -99,6 +99,7 @@ public void createInternal() throws Exception {
for (int i = 0; i < parallelism; i++) {
processorConsumers[i] =
new ProcessorConsumer(pluginRuntime.constructProcessor(parameters), sinkProducer);
+ processorConsumers[i].setDispatch(dispatch);
try {
processorConsumers[i].consumer().validate(new PipeParameterValidator(parameters));
processorConsumers[i]
@@ -118,16 +119,18 @@ public void createInternal() throws Exception {
disruptor.handleEventsWithWorkerPool(processorConsumers);
disruptor.setDefaultExceptionHandler(new ProcessorExceptionHandler());
+
+ disruptor.start();
}
@Override
public void startInternal() {
- disruptor.start();
+ // do nothing
}
@Override
public void stopInternal() {
- disruptor.halt();
+ // do nothing
}
@Override
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
index 2fe13f0..c17691e 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.collector.runtime.task.sink;
+import org.apache.iotdb.collector.runtime.task.TaskDispatch;
import org.apache.iotdb.collector.runtime.task.event.EventContainer;
import org.apache.iotdb.pipe.api.PipeSink;
import org.apache.iotdb.pipe.api.event.Event;
@@ -31,6 +32,8 @@ class SinkConsumer implements WorkHandler {
private final PipeSink sink;
+ private TaskDispatch dispatch;
+
SinkConsumer(final PipeSink sink) {
this.sink = sink;
}
@@ -41,6 +44,8 @@ PipeSink consumer() {
@Override
public void onEvent(EventContainer eventContainer) throws Exception {
+ dispatch.waitUntilRunningOrDropped();
+
// TODO: retry strategy
final Event event = eventContainer.getEvent();
if (event instanceof TabletInsertionEvent) {
@@ -51,4 +56,8 @@ public void onEvent(EventContainer eventContainer) throws Exception {
sink.transfer(event);
}
}
+
+ public void setDispatch(final TaskDispatch dispatch) {
+ this.dispatch = dispatch;
+ }
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
index e3e66db..d540b92 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java
@@ -89,6 +89,7 @@ public void createInternal() throws Exception {
consumers = new SinkConsumer[parallelism];
for (int i = 0; i < parallelism; i++) {
consumers[i] = new SinkConsumer(pluginRuntime.constructSink(parameters));
+ consumers[i].setDispatch(dispatch);
try {
consumers[i].consumer().validate(new PipeParameterValidator(parameters));
consumers[i]
@@ -109,16 +110,18 @@ public void createInternal() throws Exception {
disruptor.handleEventsWithWorkerPool(consumers);
disruptor.setDefaultExceptionHandler(new SinkExceptionHandler());
+
+ disruptor.start();
}
@Override
public void startInternal() {
- disruptor.start();
+ // do nothing
}
@Override
public void stopInternal() {
- disruptor.halt();
+ // do nothing
}
@Override
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
index ca0c8c4..2cf5f67 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java
@@ -21,6 +21,7 @@
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
import org.apache.iotdb.collector.runtime.task.Task;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.pull.PullSourceTask;
import org.apache.iotdb.collector.runtime.task.source.push.PushSourceTask;
@@ -34,20 +35,24 @@
public abstract class SourceTask extends Task {
protected final EventCollector processorProducer;
+ protected TaskStateEnum taskState;
protected SourceTask(
final String taskId,
final Map attributes,
- final EventCollector processorProducer) {
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
super(
taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), TASK_SOURCE_PARALLELISM_NUM.value());
this.processorProducer = processorProducer;
+ this.taskState = taskState;
}
public static SourceTask construct(
final String taskId,
final Map attributes,
- final EventCollector processorProducer)
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState)
throws Exception {
final PluginRuntime pluginRuntime =
RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null;
@@ -57,10 +62,10 @@ public static SourceTask construct(
final PipeParameters parameters = new PipeParameters(attributes);
if (pluginRuntime.isPullSource(parameters)) {
- return new PullSourceTask(taskId, attributes, processorProducer);
+ return new PullSourceTask(taskId, attributes, processorProducer, taskState);
}
if (pluginRuntime.isPushSource(parameters)) {
- return new PushSourceTask(taskId, attributes, processorProducer);
+ return new PushSourceTask(taskId, attributes, processorProducer, taskState);
}
throw new IllegalArgumentException("Unsupported source type");
}
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
index b47054a..d426526 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.collector.plugin.api.PullSource;
import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
import org.apache.iotdb.collector.service.RuntimeService;
@@ -50,8 +51,9 @@ public class PullSourceTask extends SourceTask {
public PullSourceTask(
final String taskId,
final Map attributes,
- final EventCollector processorProducer) {
- super(taskId, attributes, processorProducer);
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
+ super(taskId, attributes, processorProducer, taskState);
}
@Override
@@ -101,14 +103,14 @@ public void createInternal() throws Exception {
.get(taskId)
.submit(
() -> {
- while (!isDropped.get()) {
+ while (dispatch.isRunning() && TaskStateEnum.RUNNING.equals(taskState)) {
try {
consumers[finalI].onScheduler();
} catch (final Exception e) {
LOGGER.warn("Failed to pull source", e);
}
- waitUntilRunningOrDropped();
+ dispatch.waitUntilRunningOrDropped();
}
});
}
@@ -116,12 +118,12 @@ public void createInternal() throws Exception {
@Override
public void startInternal() {
- // do nothing
+ this.taskState = TaskStateEnum.RUNNING;
}
@Override
public void stopInternal() {
- // do nothing
+ this.taskState = TaskStateEnum.STOPPED;
}
@Override
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
index 938f29f..7419ae7 100644
--- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java
@@ -22,6 +22,7 @@
import org.apache.iotdb.collector.plugin.api.PushSource;
import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration;
import org.apache.iotdb.collector.runtime.plugin.PluginRuntime;
+import org.apache.iotdb.collector.runtime.task.TaskStateEnum;
import org.apache.iotdb.collector.runtime.task.event.EventCollector;
import org.apache.iotdb.collector.runtime.task.source.SourceTask;
import org.apache.iotdb.collector.service.RuntimeService;
@@ -41,8 +42,9 @@ public class PushSourceTask extends SourceTask {
public PushSourceTask(
final String taskId,
final Map sourceParams,
- final EventCollector processorProducer) {
- super(taskId, sourceParams, processorProducer);
+ final EventCollector processorProducer,
+ final TaskStateEnum taskState) {
+ super(taskId, sourceParams, processorProducer, taskState);
}
@Override
@@ -64,7 +66,9 @@ public void createInternal() throws Exception {
pushSources[i].customize(
parameters,
new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i));
- pushSources[i].start();
+ if (TaskStateEnum.RUNNING.equals(taskState)) {
+ pushSources[i].start();
+ }
} catch (final Exception e) {
try {
pushSources[i].close();
@@ -78,12 +82,42 @@ public void createInternal() throws Exception {
@Override
public void startInternal() {
- // do nothing
+ if (this.taskState.equals(TaskStateEnum.RUNNING)) {
+ return;
+ }
+
+ if (pushSources != null) {
+ for (int i = 0; i < parallelism; i++) {
+ try {
+ pushSources[i].start();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to restart push source", e);
+ return;
+ }
+ }
+ }
+
+ this.taskState = TaskStateEnum.RUNNING;
}
@Override
public void stopInternal() {
- // do nothing
+ if (this.taskState.equals(TaskStateEnum.STOPPED)) {
+ return;
+ }
+
+ if (pushSources != null) {
+ for (int i = 0; i < parallelism; i++) {
+ try {
+ pushSources[i].close();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to stop source", e);
+ return;
+ }
+ }
+
+ this.taskState = TaskStateEnum.STOPPED;
+ }
}
@Override
diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
new file mode 100644
index 0000000..a5eb42e
--- /dev/null
+++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.collector.service;
+
+import org.apache.iotdb.collector.persistence.DBConstant;
+import org.apache.iotdb.collector.persistence.PluginPersistence;
+import org.apache.iotdb.collector.persistence.TaskPersistence;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR;
+
+public class PersistenceService implements IService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PersistenceService.class);
+
+ private static final AtomicReference PLUGIN = new AtomicReference<>();
+ private static final AtomicReference TASK = new AtomicReference<>();
+
+ @Override
+ public void start() {
+ initPluginDir();
+
+ PLUGIN.set(new PluginPersistence(DBConstant.PLUGIN_DATABASE_URL));
+ TASK.set(new TaskPersistence(DBConstant.TASK_DATABASE_URL));
+
+ PLUGIN.get().tryResume();
+ TASK.get().tryResume();
+ }
+
+ private void initPluginDir() {
+ final Path pluginInstallPath = Paths.get(PLUGIN_INSTALL_LIB_DIR.value());
+ try {
+ if (!Files.exists(pluginInstallPath)) {
+ FileUtils.forceMkdir(pluginInstallPath.toFile());
+ }
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to create plugin install directory", e);
+ }
+ }
+
+ public static Optional plugin() {
+ return Optional.of(PLUGIN.get());
+ }
+
+ public static Optional task() {
+ return Optional.of(TASK.get());
+ }
+
+ @Override
+ public void stop() {
+ PLUGIN.set(null);
+ TASK.set(null);
+ }
+
+ @Override
+ public String name() {
+ return "PersistenceService";
+ }
+}
diff --git a/iotdb-collector/collector-core/src/main/resources/application.properties b/iotdb-collector/collector-core/src/main/resources/application.properties
index 244ed55..925ef65 100644
--- a/iotdb-collector/collector-core/src/main/resources/application.properties
+++ b/iotdb-collector/collector-core/src/main/resources/application.properties
@@ -54,3 +54,17 @@ task_processor_ring_buffer_size=1024
# Effective mode: on every start
# Data type: int
task_sink_ring_buffer_size=1024
+
+####################
+### Plugin Configuration
+####################
+
+# The location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_lib_dir=ext/plugin
+
+# Installation location of plugin jar file
+# Effective mode: on every start
+# Data type: string
+plugin_install_lib_dir=ext/plugin/install
\ No newline at end of file
diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
index 25ea5b5..eec0e5f 100644
--- a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
+++ b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml
@@ -41,50 +41,21 @@ paths:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
- /plugin/v1/alter:
- post:
- operationId: alterPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/AlterPluginRequest'
- responses:
- "200":
- $ref: '#/components/responses/SuccessExecutionStatus'
-
- /plugin/v1/start:
- post:
- operationId: startPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/StartPluginRequest'
- responses:
- "200":
- $ref: '#/components/responses/SuccessExecutionStatus'
-
- /plugin/v1/stop:
+ /plugin/v1/drop:
post:
- operationId: stopPlugin
+ operationId: dropPlugin
requestBody:
content:
application/json:
schema:
- $ref: '#/components/schemas/StopPluginRequest'
+ $ref: '#/components/schemas/DropPluginRequest'
responses:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
- /plugin/v1/drop:
+ /plugin/v1/show:
post:
- operationId: dropPlugin
- requestBody:
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/DropPluginRequest'
+ operationId: showPlugin
responses:
"200":
$ref: '#/components/responses/SuccessExecutionStatus'
@@ -93,51 +64,16 @@ components:
schemas:
CreatePluginRequest:
properties:
- sourceAttribute:
- type: object
- additionalProperties:
- type: string
- processorAttribute:
- type: object
- additionalProperties:
- type: string
- sinkAttribute:
- type: object
- additionalProperties:
- type: string
- taskId:
- type: string
-
- AlterPluginRequest:
- properties:
- sourceAttribute:
- type: object
- additionalProperties:
- type: string
- processorAttribute:
- type: object
- additionalProperties:
- type: string
- sinkAttribute:
- type: object
- additionalProperties:
- type: string
- taskId:
+ pluginName:
type: string
-
- StartPluginRequest:
- properties:
- taskId:
+ className:
type: string
-
- StopPluginRequest:
- properties:
- taskId:
+ jarName:
type: string
DropPluginRequest:
properties:
- taskId:
+ pluginName:
type: string
ExecutionStatus:
diff --git a/pom.xml b/pom.xml
index 4423708..da29b5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
2.7.18
5.3.39
+ 3.49.1.0
1.6.14
chmod
@@ -861,6 +862,11 @@
disruptor
${disruptor.version}
+
+ org.xerial
+ sqlite-jdbc
+ ${sqlite.version}
+