diff --git a/iotdb-collector/collector-core/pom.xml b/iotdb-collector/collector-core/pom.xml index 7b68e68..c6517b8 100644 --- a/iotdb-collector/collector-core/pom.xml +++ b/iotdb-collector/collector-core/pom.xml @@ -107,6 +107,10 @@ com.google.code.findbugs jsr305 + + org.xerial + sqlite-jdbc + diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java index 96ecdaf..650caee 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/Application.java @@ -22,6 +22,7 @@ import org.apache.iotdb.collector.config.Configuration; import org.apache.iotdb.collector.service.ApiService; import org.apache.iotdb.collector.service.IService; +import org.apache.iotdb.collector.service.PersistenceService; import org.apache.iotdb.collector.service.RuntimeService; import org.slf4j.Logger; @@ -39,6 +40,7 @@ public class Application { private Application() { services.add(new RuntimeService()); services.add(new ApiService()); + services.add(new PersistenceService()); } public static void main(String[] args) { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java index b64f017..041dde1 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceImpl.java @@ -20,11 +20,9 @@ package org.apache.iotdb.collector.api.v1.plugin.impl; import org.apache.iotdb.collector.api.v1.plugin.PluginApiService; -import org.apache.iotdb.collector.api.v1.plugin.model.AlterPluginRequest; import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest; import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest; -import org.apache.iotdb.collector.api.v1.plugin.model.StartPluginRequest; -import org.apache.iotdb.collector.api.v1.plugin.model.StopPluginRequest; +import org.apache.iotdb.collector.service.RuntimeService; import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; @@ -34,30 +32,34 @@ public class PluginApiServiceImpl extends PluginApiService { @Override public Response createPlugin( final CreatePluginRequest createPluginRequest, final SecurityContext securityContext) { - return Response.ok("create plugin").build(); - } + PluginApiServiceRequestValidationHandler.validateCreatePluginRequest(createPluginRequest); - @Override - public Response alterPlugin( - final AlterPluginRequest alterPluginRequest, final SecurityContext securityContext) { - return Response.ok("alter plugin").build(); + return RuntimeService.plugin().isPresent() + ? RuntimeService.plugin() + .get() + .createPlugin( + createPluginRequest.getPluginName().toUpperCase(), + createPluginRequest.getClassName(), + createPluginRequest.getJarName(), + null, + true) + : Response.ok("create plugin").build(); } @Override - public Response startPlugin( - final StartPluginRequest startPluginRequest, final SecurityContext securityContext) { - return Response.ok("start plugin").build(); - } + public Response dropPlugin( + final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) { + PluginApiServiceRequestValidationHandler.validateDropPluginRequest(dropPluginRequest); - @Override - public Response stopPlugin( - final StopPluginRequest stopPluginRequest, final SecurityContext securityContext) { - return Response.ok("stop plugin").build(); + return RuntimeService.plugin().isPresent() + ? RuntimeService.plugin().get().dropPlugin(dropPluginRequest.getPluginName().toUpperCase()) + : Response.ok("drop plugin").build(); } @Override - public Response dropPlugin( - final DropPluginRequest dropPluginRequest, final SecurityContext securityContext) { - return Response.ok("drop plugin").build(); + public Response showPlugin(final SecurityContext securityContext) { + return RuntimeService.plugin().isPresent() + ? RuntimeService.plugin().get().showPlugin() + : Response.ok("show plugin").build(); } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java index 36adce3..6f7d983 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/plugin/impl/PluginApiServiceRequestValidationHandler.java @@ -19,4 +19,21 @@ package org.apache.iotdb.collector.api.v1.plugin.impl; -public class PluginApiServiceRequestValidationHandler {} +import org.apache.iotdb.collector.api.v1.plugin.model.CreatePluginRequest; +import org.apache.iotdb.collector.api.v1.plugin.model.DropPluginRequest; + +import java.util.Objects; + +public class PluginApiServiceRequestValidationHandler { + private PluginApiServiceRequestValidationHandler() {} + + public static void validateCreatePluginRequest(final CreatePluginRequest createPluginRequest) { + Objects.requireNonNull(createPluginRequest.getPluginName(), "plugin name cannot be null"); + Objects.requireNonNull(createPluginRequest.getClassName(), "class name cannot be null"); + Objects.requireNonNull(createPluginRequest.getJarName(), "jar name cannot be null"); + } + + public static void validateDropPluginRequest(final DropPluginRequest dropPluginRequest) { + Objects.requireNonNull(dropPluginRequest.getPluginName(), "plugin name cannot be null"); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java index 053b618..cd563da 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/api/v1/task/impl/TaskApiServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.iotdb.collector.api.v1.task.model.DropTaskRequest; import org.apache.iotdb.collector.api.v1.task.model.StartTaskRequest; import org.apache.iotdb.collector.api.v1.task.model.StopTaskRequest; +import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.service.RuntimeService; import javax.ws.rs.core.Response; @@ -42,9 +43,11 @@ public Response createTask( .get() .createTask( createTaskRequest.getTaskId(), + TaskStateEnum.RUNNING, createTaskRequest.getSourceAttribute(), createTaskRequest.getProcessorAttribute(), - createTaskRequest.getSinkAttribute()) + createTaskRequest.getSinkAttribute(), + true) : Response.serverError().entity("Task runtime is down").build(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java index e067086..16fd814 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/Options.java @@ -36,6 +36,7 @@ public class Options { try { Class.forName(ApiServiceOptions.class.getName()); Class.forName(TaskRuntimeOptions.class.getName()); + Class.forName(PluginRuntimeOptions.class.getName()); } catch (final ClassNotFoundException e) { throw new RuntimeException("Failed to load options", e); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java new file mode 100644 index 0000000..92eb04b --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/config/PluginRuntimeOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.config; + +import java.io.File; + +public class PluginRuntimeOptions extends Options { + public static final Option PLUGIN_LIB_DIR = + new Option("plugin_lib_dir", "ext" + File.separator + "plugin") { + @Override + public void setValue(final String valueString) { + value = valueString; + } + }; + + public static final Option PLUGIN_INSTALL_LIB_DIR = + new Option( + "plugin_install_lib_dir", PLUGIN_LIB_DIR.value() + File.separator + "install") { + @Override + public void setValue(final String valueString) { + value = valueString; + } + }; +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java new file mode 100644 index 0000000..2e8c3bb --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/DBConstant.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.persistence; + +public class DBConstant { + + public static final String CREATE_PLUGIN_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS plugin\n" + + "(\n" + + " plugin_name TEXT PRIMARY KEY,\n" + + " class_name TEXT NOT NULL,\n" + + " jar_name TEXT NOT NULL,\n" + + " jar_md5 TEXT NOT NULL,\n" + + " create_time TEXT NOT NULL\n" + + ");"; + public static final String CREATE_TASK_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS task\n" + + "(\n" + + " task_id TEXT PRIMARY KEY,\n" + + " task_state INT NOT NULL,\n" + + " source_attribute BLOB NOT NULL,\n" + + " processor_attribute BLOB NOT NULL,\n" + + " sink_attribute BLOB NOT NULL,\n" + + " create_time TEXT NOT NULL\n" + + ");"; + + public static final String PLUGIN_DATABASE_FILE_PATH = "ext/db/plugin.db"; + public static final String TASK_DATABASE_FILE_PATH = "ext/db/task.db"; + + public static final String PLUGIN_DATABASE_URL = "jdbc:sqlite:" + PLUGIN_DATABASE_FILE_PATH; + public static final String TASK_DATABASE_URL = "jdbc:sqlite:" + TASK_DATABASE_FILE_PATH; +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java new file mode 100644 index 0000000..b2abcb8 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/Persistence.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.persistence; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public abstract class Persistence { + + private final String databaseUrl; + + public Persistence(final String databaseUrl) { + this.databaseUrl = databaseUrl; + initDatabaseFileIfPossible(); + initTableIfPossible(); + } + + protected abstract void initDatabaseFileIfPossible(); + + protected abstract void initTableIfPossible(); + + public abstract void tryResume(); + + protected Connection getConnection() throws SQLException { + return DriverManager.getConnection(databaseUrl); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java new file mode 100644 index 0000000..d1d2b20 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/PluginPersistence.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.persistence; + +import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils; +import org.apache.iotdb.collector.service.RuntimeService; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Objects; + +public class PluginPersistence extends Persistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(PluginPersistence.class); + + public PluginPersistence(String databaseUrl) { + super(databaseUrl); + } + + @Override + protected void initDatabaseFileIfPossible() { + try { + final Path pluginDatabaseFilePath = Paths.get(DBConstant.PLUGIN_DATABASE_FILE_PATH); + if (!Files.exists(pluginDatabaseFilePath)) { + Files.createFile(pluginDatabaseFilePath); + } + } catch (final IOException e) { + LOGGER.warn("Failed to create plugin database file", e); + } + } + + @Override + protected void initTableIfPossible() { + try (final Connection connection = getConnection()) { + final PreparedStatement statement = + connection.prepareStatement(DBConstant.CREATE_PLUGIN_TABLE_SQL); + statement.executeUpdate(); + } catch (final SQLException e) { + LOGGER.warn("Failed to create plugin database", e); + } + } + + @Override + public void tryResume() { + final String queryAllPluginSQL = + "SELECT plugin_name, class_name, jar_name, jar_md5 FROM plugin"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(queryAllPluginSQL); + final ResultSet pluginResultSet = statement.executeQuery(); + + while (pluginResultSet.next()) { + final String pluginName = pluginResultSet.getString("plugin_name"); + final String className = pluginResultSet.getString("class_name"); + final String jarName = pluginResultSet.getString("jar_name"); + final String jarMd5 = pluginResultSet.getString("jar_md5"); + + if (!isPluginJarFileWithMD5NameExists(pluginName, jarName, jarMd5)) { + tryDeletePlugin(pluginName); + continue; + } + + tryRecoverPlugin(pluginName, className, jarName, jarMd5); + } + } catch (final SQLException e) { + LOGGER.warn("Failed to resume plugin persistence message, because {}", e.getMessage()); + } + } + + private boolean isPluginJarFileWithMD5NameExists( + final String pluginName, final String jarName, final String jarMd5) { + final Path pluginJarFileWithMD5Path = + Paths.get( + PluginFileUtils.getPluginJarFileWithMD5FilePath( + pluginName, PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMd5))); + + return Files.exists(pluginJarFileWithMD5Path); + } + + private void tryRecoverPlugin( + final String pluginName, final String className, final String jarName, final String jarMD5) { + final Response response = + RuntimeService.plugin().isPresent() + ? RuntimeService.plugin() + .get() + .createPlugin(pluginName, className, jarName, jarMD5, false) + : null; + + if (Objects.isNull(response) || response.getStatus() != Response.Status.OK.getStatusCode()) { + LOGGER.warn("Failed to recover plugin message from plugin {}: {}", pluginName, response); + } + } + + public void tryPersistencePlugin( + final String pluginName, final String className, final String jarName, final String jarMD5) { + final String sql = + "INSERT INTO plugin(plugin_name, class_name, jar_name, jar_md5, create_time) VALUES(?,?,?,?,?)"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(sql); + statement.setString(1, pluginName); + statement.setString(2, className); + statement.setString(3, jarName); + statement.setString(4, jarMD5); + statement.setString(5, String.valueOf(new Timestamp(System.currentTimeMillis()))); + statement.executeUpdate(); + } catch (final SQLException e) { + LOGGER.warn("Failed to persistence plugin message, because {}", e.getMessage()); + } + } + + public void tryDeletePlugin(final String pluginName) { + final String sql = "DELETE FROM plugin WHERE plugin_name = ?"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(sql); + statement.setString(1, pluginName); + statement.executeUpdate(); + } catch (final SQLException e) { + LOGGER.warn("Failed to delete plugin persistence message, because {}", e.getMessage()); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java new file mode 100644 index 0000000..bf62519 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/persistence/TaskPersistence.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.persistence; + +import org.apache.iotdb.collector.runtime.task.TaskStateEnum; +import org.apache.iotdb.collector.service.RuntimeService; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class TaskPersistence extends Persistence { + + private static final Logger LOGGER = LoggerFactory.getLogger(TaskPersistence.class); + + public TaskPersistence(String databaseUrl) { + super(databaseUrl); + } + + @Override + protected void initDatabaseFileIfPossible() { + try { + final Path taskDatabaseFilePath = Paths.get(DBConstant.TASK_DATABASE_FILE_PATH); + if (!Files.exists(taskDatabaseFilePath)) { + Files.createFile(taskDatabaseFilePath); + } + } catch (final IOException e) { + LOGGER.warn("Failed to create task database file", e); + } + } + + @Override + protected void initTableIfPossible() { + try (final Connection connection = getConnection()) { + final PreparedStatement statement = + connection.prepareStatement(DBConstant.CREATE_TASK_TABLE_SQL); + statement.executeUpdate(); + } catch (final SQLException e) { + LOGGER.warn("Failed to create task database", e); + } + } + + @Override + public void tryResume() { + final String queryAllTaskSQL = + "SELECT task_id, task_state, source_attribute, processor_attribute, sink_attribute, create_time FROM task"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(queryAllTaskSQL); + final ResultSet taskResultSet = statement.executeQuery(); + + while (taskResultSet.next()) { + final String taskId = taskResultSet.getString(1); + final TaskStateEnum taskState = TaskStateEnum.values()[taskResultSet.getInt(2)]; + final byte[] sourceAttribute = taskResultSet.getBytes(3); + final byte[] processorAttribute = taskResultSet.getBytes(4); + final byte[] sinkAttribute = taskResultSet.getBytes(5); + + tryRecoverTask( + taskId, + taskState, + deserialize(sourceAttribute), + deserialize(processorAttribute), + deserialize(sinkAttribute)); + } + } catch (final SQLException e) { + LOGGER.warn("Failed to resume task persistence message, because {}", e.getMessage()); + } + } + + public void tryRecoverTask( + final String taskId, + final TaskStateEnum taskState, + final Map sourceAttribute, + final Map processorAttribute, + final Map sinkAttribute) { + final Response response = + RuntimeService.task().isPresent() + ? RuntimeService.task() + .get() + .createTask( + taskId, taskState, sourceAttribute, processorAttribute, sinkAttribute, false) + : null; + + if (Objects.isNull(response) || response.getStatus() != Response.Status.OK.getStatusCode()) { + LOGGER.warn("Failed to recover task persistence message, because {}", response); + } + } + + private Map deserialize(final byte[] buffer) { + final Map attribute = new HashMap<>(); + final ByteBuffer attributeBuffer = ByteBuffer.wrap(buffer); + + final int size = ReadWriteIOUtils.readInt(attributeBuffer); + for (int i = 0; i < size; i++) { + final String key = ReadWriteIOUtils.readString(attributeBuffer); + final String value = ReadWriteIOUtils.readString(attributeBuffer); + + attribute.put(key, value); + } + + return attribute; + } + + public void tryPersistenceTask( + final String taskId, + final TaskStateEnum taskState, + final Map sourceAttribute, + final Map processorAttribute, + final Map sinkAttribute) { + final String insertSQL = + "INSERT INTO task(task_id, task_state , source_attribute, processor_attribute, sink_attribute, create_time) values(?, ?, ?, ?, ?, ?)"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(insertSQL); + + final byte[] sourceAttributeBuffer = serialize(sourceAttribute); + final byte[] processorAttributeBuffer = serialize(processorAttribute); + final byte[] sinkAttributeBuffer = serialize(sinkAttribute); + + statement.setString(1, taskId); + statement.setInt(2, taskState.getTaskState()); + statement.setBytes(3, sourceAttributeBuffer); + statement.setBytes(4, processorAttributeBuffer); + statement.setBytes(5, sinkAttributeBuffer); + statement.setString(6, String.valueOf(new Timestamp(System.currentTimeMillis()))); + statement.executeUpdate(); + } catch (final SQLException | IOException e) { + LOGGER.warn("Failed to persistence task message, because {}", e.getMessage()); + } + } + + private byte[] serialize(final Map attribute) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(attribute.size(), outputStream); + for (final Map.Entry entry : attribute.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()) + .array(); + } + } + + public void tryDeleteTask(final String taskId) { + final String deleteSQL = "DELETE FROM task WHERE task_id = ?"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(deleteSQL); + statement.setString(1, taskId); + statement.executeUpdate(); + } catch (final SQLException e) { + LOGGER.warn("Failed to delete task persistence message, because {}", e.getMessage()); + } + } + + public void tryAlterTaskState(final String taskId, final TaskStateEnum taskState) { + final String alterSQL = "UPDATE task SET task_state = ? WHERE task_id = ?"; + + try (final Connection connection = getConnection()) { + final PreparedStatement statement = connection.prepareStatement(alterSQL); + statement.setInt(1, taskState.getTaskState()); + statement.setString(2, taskId); + statement.executeUpdate(); + } catch (SQLException e) { + LOGGER.warn("Failed to alter task persistence message, because {}", e.getMessage()); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java new file mode 100644 index 0000000..32ea664 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/api/customizer/CollectorParameters.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.plugin.api.customizer; + +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import java.util.Map; + +public class CollectorParameters extends PipeParameters { + public CollectorParameters(final Map attributes) { + super(attributes); + this.attributes.forEach( + (key, value) -> { + if (!"taskId".equals(key)) { + attributes.put(key, value.replace("_", "-")); + } + }); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java index c8bfd93..afb8d15 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPullSource.java @@ -56,7 +56,7 @@ public void start() throws Exception {} @Override public Event supply() { - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(100)); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000))); LOGGER.info("{} created successfully ...", event); return event; diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java index 233335c..fa81fc2 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/source/HttpPushSource.java @@ -66,7 +66,7 @@ private void doWork() { final Event event = new DemoEvent(String.valueOf(new Random().nextInt(1000))); LOGGER.info("{} created successfully ...", event); supply(event); - TimeUnit.SECONDS.sleep(100); + TimeUnit.SECONDS.sleep(2); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java index adbad7d..627ace6 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/PluginRuntime.java @@ -22,24 +22,46 @@ import org.apache.iotdb.collector.runtime.plugin.constructor.ProcessorConstructor; import org.apache.iotdb.collector.runtime.plugin.constructor.SinkConstructor; import org.apache.iotdb.collector.runtime.plugin.constructor.SourceConstructor; +import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoader; +import org.apache.iotdb.collector.runtime.plugin.load.PluginClassLoaderManager; +import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta; import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.collector.runtime.plugin.utils.PluginFileUtils; +import org.apache.iotdb.collector.service.PersistenceService; +import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.PipeSink; import org.apache.iotdb.pipe.api.PipeSource; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.FilenameUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Objects; + public class PluginRuntime implements AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PluginRuntime.class); + private final PluginMetaKeeper metaKeeper; private final SourceConstructor sourceConstructor; private final ProcessorConstructor processorConstructor; private final SinkConstructor sinkConstructor; + private final PluginClassLoaderManager classLoaderManager; public PluginRuntime() { this.metaKeeper = new PluginMetaKeeper(); this.sourceConstructor = new SourceConstructor(metaKeeper); this.processorConstructor = new ProcessorConstructor(metaKeeper); this.sinkConstructor = new SinkConstructor(metaKeeper); + this.classLoaderManager = new PluginClassLoaderManager(); } public PipeSource constructSource(final PipeParameters sourceParameters) { @@ -66,24 +88,139 @@ public PipeSink constructSink(final PipeParameters sinkParameters) { return sinkConstructor.reflectPlugin(sinkParameters); } - public boolean createPlugin() { - return true; + public PluginClassLoader getClassLoader(final String pluginName) throws IOException { + return classLoaderManager.getPluginClassLoader(pluginName); } - public boolean alterPlugin() { - return true; - } - - public boolean startPlugin() { - return true; + public synchronized Response createPlugin( + final String pluginName, + final String className, + final String jarName, + final String jarMD5FromDB, + final boolean isRestRequest) { + try { + // validate whether the plugin jar file exists + if (isRestRequest && !PluginFileUtils.isPluginJarFileExist(jarName)) { + final String errorMessage = + String.format( + "Failed to register Plugin %s, because the plugin jar file %s is not found", + pluginName, jarName); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } + + // validate whether the plugin has been loaded + final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName); + if (Objects.nonNull(information)) { + // validate whether the plugin is builtin plugin + if (information.isBuiltin()) { + final String errorMessage = + String.format( + "Failed to register Plugin %s, because the given Plugin name is the same as a built-in Plugin name.", + pluginName); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } + + // otherwise the plugin has been registered + final String errorMessage = + String.format( + "Failed to register Plugin %s, because the Plugin has been registered.", + pluginName); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } + + // get the plugin jar md5 + final String jarMD5 = + jarMD5FromDB == null + ? DigestUtils.md5Hex( + Files.newInputStream(Paths.get(PluginFileUtils.getPluginJarFilePath(jarName)))) + : jarMD5FromDB; + + // If the {pluginName} directory already exists, delete the directory and the files under it, + // recreate the directory, and move the files to the new directory. If an exception occurs in + // the middle, delete the created directory. + final String pluginJarFileNameWithMD5 = + PluginFileUtils.getPluginJarFileNameWithMD5(jarName, jarMD5); + PluginFileUtils.savePluginToInstallDir(pluginName, jarName, pluginJarFileNameWithMD5); + + // create and save plugin class loader + final PluginClassLoader classLoader = + classLoaderManager.createPluginClassLoader( + PluginFileUtils.getPluginInstallDirPath(pluginName)); + + final Class pluginClass = Class.forName(className, true, classLoader); + @SuppressWarnings("unused") // ensure that it is a PipePlugin class + final PipePlugin ignored = (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); + + classLoaderManager.addPluginClassLoader(pluginName, classLoader); + metaKeeper.addPipePluginMeta( + pluginName, new PluginMeta(pluginName, className, false, jarName, jarMD5)); + metaKeeper.addJarNameAndMd5(jarName, jarMD5); + + // storage registered plugin info + if (isRestRequest) { + PersistenceService.plugin() + .ifPresent( + pluginPersistence -> + pluginPersistence.tryPersistencePlugin(pluginName, className, jarName, jarMD5)); + } + + final String successMessage = String.format("Successfully register Plugin %s", pluginName); + LOGGER.info(successMessage); + return Response.ok().entity(successMessage).build(); + } catch (final Exception e) { + final String errorMessage = + String.format("Failed to register Plugin %s, because %s", pluginName, e); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } } - public boolean stopPlugin() { - return true; + public synchronized Response dropPlugin(final String pluginName) { + try { + final PluginMeta information = metaKeeper.getPipePluginMeta(pluginName); + if (Objects.nonNull(information) && information.isBuiltin()) { + final String errorMessage = + String.format("Failed to deregister builtin Plugin %s.", pluginName); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } + + // if it is needed to delete jar file of the plugin, delete both jar file and md5 + final String installedFileName = + FilenameUtils.getBaseName(information.getJarName()) + + "-" + + information.getJarMD5() + + "." + + FilenameUtils.getExtension(information.getJarName()); + PluginFileUtils.removePluginFileUnderLibRoot(information.getPluginName(), installedFileName); + + // remove anyway + metaKeeper.removeJarNameAndMd5IfPossible(pluginName); + metaKeeper.removePipePluginMeta(pluginName); + classLoaderManager.removePluginClassLoader(pluginName); + + // remove plugin info from sqlite + PersistenceService.plugin() + .ifPresent(pluginPersistence -> pluginPersistence.tryDeletePlugin(pluginName)); + + final String successMessage = String.format("Successfully deregister Plugin %s", pluginName); + LOGGER.info(successMessage); + return Response.ok().entity(successMessage).build(); + } catch (final IOException e) { + final String errorMessage = + String.format( + "Failed to deregister builtin Plugin %s, because %s", pluginName, e.getMessage()); + LOGGER.warn(errorMessage); + return Response.serverError().entity(errorMessage).build(); + } } - public boolean dropPlugin() { - return true; + public Response showPlugin() { + final Iterable pluginMetas = metaKeeper.getAllPipePluginMeta(); + return Response.ok().entity(pluginMetas).build(); } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java index c64ef39..72bccb9 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/PluginConstructor.java @@ -21,6 +21,7 @@ import org.apache.iotdb.collector.runtime.plugin.meta.PluginMeta; import org.apache.iotdb.collector.runtime.plugin.meta.PluginMetaKeeper; +import org.apache.iotdb.collector.service.RuntimeService; import org.apache.iotdb.pipe.api.PipePlugin; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -28,7 +29,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.InvocationTargetException; import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -81,13 +81,14 @@ private PipePlugin reflect(String pluginName) { final Class pluginClass = information.isBuiltin() ? pluginMetaKeeper.getBuiltinPluginClass(information.getPluginName()) - : Class.forName(information.getClassName()); // TODO + : Class.forName( + information.getClassName(), + true, + RuntimeService.plugin().isPresent() + ? RuntimeService.plugin().get().getClassLoader(pluginName) + : null); return (PipePlugin) pluginClass.getDeclaredConstructor().newInstance(); - } catch (InstantiationException - | InvocationTargetException - | NoSuchMethodException - | IllegalAccessException - | ClassNotFoundException e) { + } catch (final Exception e) { String errorMessage = String.format( "Failed to reflect PipePlugin %s(%s) instance, because %s", diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java index 066cf08..f5dd6be 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SinkConstructor.java @@ -38,7 +38,7 @@ protected void initConstructors() { @Override public final PipeSink reflectPlugin(PipeParameters sinkParameters) { - if (sinkParameters.hasAttribute("sink")) { + if (!sinkParameters.hasAttribute("sink")) { throw new IllegalArgumentException("sink attribute is required"); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java index f4c2cd0..2899862 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/constructor/SourceConstructor.java @@ -42,7 +42,7 @@ protected void initConstructors() { @Override public final PipeSource reflectPlugin(PipeParameters sourceParameters) { - if (sourceParameters.hasAttribute("source")) { + if (!sourceParameters.hasAttribute("source")) { throw new IllegalArgumentException("source attribute is required"); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java new file mode 100644 index 0000000..7e93a1b --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.load; + +import javax.annotation.concurrent.ThreadSafe; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@ThreadSafe +public class PluginClassLoader extends URLClassLoader { + + private final String libRoot; + private volatile boolean deprecated; + + public PluginClassLoader(final String libRoot) throws IOException { + super(new URL[0]); + this.libRoot = libRoot; + this.deprecated = false; + addUrls(); + } + + private void addUrls() throws IOException { + try (final Stream pathStream = Files.walk(new File(libRoot).toPath())) { + for (final Path path : + pathStream.filter(path -> !path.toFile().isDirectory()).collect(Collectors.toList())) { + super.addURL(path.toUri().toURL()); + } + } + } + + public synchronized void markAsDeprecated() throws IOException { + deprecated = true; + closeIfPossible(); + } + + public void closeIfPossible() throws IOException { + if (deprecated) { + close(); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java new file mode 100644 index 0000000..c60a868 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/load/PluginClassLoaderManager.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.load; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@NotThreadSafe +public class PluginClassLoaderManager { + + private final Map pluginNameToClassLoaderMap; + + public PluginClassLoaderManager() { + this.pluginNameToClassLoaderMap = new ConcurrentHashMap<>(); + } + + public void removePluginClassLoader(final String pluginName) throws IOException { + final PluginClassLoader classLoader = pluginNameToClassLoaderMap.remove(pluginName); + if (classLoader != null) { + classLoader.markAsDeprecated(); + } + } + + public PluginClassLoader getPluginClassLoader(final String pluginName) throws IOException { + return pluginNameToClassLoaderMap.get(pluginName.toUpperCase()); + } + + public void addPluginClassLoader(final String pluginName, final PluginClassLoader classLoader) { + pluginNameToClassLoaderMap.put(pluginName.toUpperCase(), classLoader); + } + + public PluginClassLoader createPluginClassLoader(final String pluginDirPath) throws IOException { + return new PluginClassLoader(pluginDirPath); + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java index 4d9ddfd..b6b1f90 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/meta/PluginMetaKeeper.java @@ -19,30 +19,32 @@ package org.apache.iotdb.collector.runtime.plugin.meta; +import org.apache.iotdb.collector.plugin.builtin.BuiltinPlugin; + import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; public class PluginMetaKeeper { - protected final Map pipePluginNameToMetaMap = new ConcurrentHashMap<>(); - protected final Map> builtinPipePluginNameToClassMap = new ConcurrentHashMap<>(); + private final Map pipePluginNameToMetaMap = new ConcurrentHashMap<>(); + private final Map> builtinPipePluginNameToClassMap = new ConcurrentHashMap<>(); + private final Map jarNameToMd5Map = new ConcurrentHashMap<>(); + private final Map jarNameToReferenceCountMap = new ConcurrentHashMap<>(); public PluginMetaKeeper() { loadBuiltinPlugins(); } private void loadBuiltinPlugins() { - // for (final BuiltinPipePlugin builtinPipePlugin : BuiltinPipePlugin.values()) { - // final String pipePluginName = builtinPipePlugin.getPipePluginName(); - // final Class pipePluginClass = builtinPipePlugin.getPipePluginClass(); - // final String className = builtinPipePlugin.getClassName(); - // - // addPipePluginMeta(pipePluginName, new PluginMeta(pipePluginName, className)); - // addBuiltinPluginClass(pipePluginName, pipePluginClass); - // addPipePluginVisibility( - // pipePluginName, VisibilityUtils.calculateFromPluginClass(pipePluginClass)); - // } + for (final BuiltinPlugin builtinPlugin : BuiltinPlugin.values()) { + final String pluginName = builtinPlugin.getPluginName(); + final Class pluginClass = builtinPlugin.getPluginClass(); + final String className = builtinPlugin.getClassName(); + + addPipePluginMeta(pluginName, new PluginMeta(pluginName, className)); + addBuiltinPluginClass(pluginName, pluginClass); + } } public void addPipePluginMeta(String pluginName, PluginMeta pluginMeta) { @@ -82,6 +84,35 @@ public String getPluginNameByJarName(String jarName) { return null; } + public boolean containsJar(final String jarName) { + return jarNameToMd5Map.containsKey(jarName); + } + + public boolean jarNameExistsAndMatchesMd5(final String jarName, final String md5) { + return jarNameToMd5Map.containsKey(jarName) && jarNameToMd5Map.get(jarName).equals(md5); + } + + public void addJarNameAndMd5(final String jarName, final String md5) { + if (jarNameToReferenceCountMap.containsKey(jarName)) { + jarNameToReferenceCountMap.put(jarName, jarNameToReferenceCountMap.get(jarName) + 1); + } else { + jarNameToReferenceCountMap.put(jarName, 1); + jarNameToMd5Map.put(jarName, md5); + } + } + + public void removeJarNameAndMd5IfPossible(final String jarName) { + if (jarNameToReferenceCountMap.containsKey(jarName)) { + final int count = jarNameToReferenceCountMap.get(jarName); + if (count == 1) { + jarNameToReferenceCountMap.remove(jarName); + jarNameToMd5Map.remove(jarName); + } else { + jarNameToReferenceCountMap.put(jarName, count - 1); + } + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java new file mode 100644 index 0000000..ebc693d --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/plugin/utils/PluginFileUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.plugin.utils; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.FilenameUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR; +import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_LIB_DIR; + +public class PluginFileUtils { + + public static void savePluginToInstallDir( + final String pluginName, final String jarName, final String jarNameWithMD5) + throws IOException { + final Path pluginInstallPath = + Paths.get(PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName); + final Path pluginJarInstallPath = + Paths.get(getPluginJarFileWithMD5FilePath(pluginName, jarNameWithMD5)); + + if (!Files.exists(pluginInstallPath)) { + FileUtils.forceMkdir(pluginInstallPath.toFile()); + } + if (Files.exists(pluginJarInstallPath)) { + return; + } + + FileUtils.moveFile( + new File(getPluginJarFilePath(jarName)), + pluginJarInstallPath.toFile(), + StandardCopyOption.REPLACE_EXISTING); + } + + public static boolean isPluginJarFileExist(final String jarName) { + return Files.exists(Paths.get(getPluginJarFilePath(jarName))); + } + + public static String getPluginJarFilePath(final String jarName) { + return PLUGIN_LIB_DIR.value() + File.separator + jarName; + } + + public static String getPluginJarFileWithMD5FilePath( + final String pluginName, final String jarNameWithMD5) { + return getPluginInstallDirPath(pluginName) + File.separator + jarNameWithMD5; + } + + public static String getPluginInstallDirPath(final String pluginName) { + return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName; + } + + public static String getPluginJarFileNameWithMD5(final String jarName, final String jarMD5) { + return FilenameUtils.getBaseName(jarName) + + "-" + + jarMD5 + + "." + + FilenameUtils.getExtension(jarName); + } + + public static void removePluginFileUnderLibRoot(final String pluginName, final String fileName) + throws IOException { + final Path pluginDirPath = Paths.get(getPluginInstallFilePath(pluginName, fileName)); + + Files.deleteIfExists(pluginDirPath); + Files.deleteIfExists(pluginDirPath.getParent()); + } + + public static String getPluginInstallFilePath(final String pluginName, final String fileName) { + return PLUGIN_INSTALL_LIB_DIR.value() + File.separator + pluginName + File.separator + fileName; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java index ac270ce..2dbcc71 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/Task.java @@ -19,22 +19,18 @@ package org.apache.iotdb.collector.runtime.task; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.collector.plugin.api.customizer.CollectorParameters; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.LockSupport; public abstract class Task { protected final String taskId; - protected final PipeParameters parameters; + protected final CollectorParameters parameters; protected final int parallelism; - private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L; - protected final AtomicBoolean isRunning = new AtomicBoolean(false); - protected final AtomicBoolean isDropped = new AtomicBoolean(false); + protected final TaskDispatch dispatch; protected Task( final String taskId, @@ -42,49 +38,36 @@ protected Task( final String parallelismKey, final int parallelismValue) { this.taskId = taskId; - this.parameters = new PipeParameters(attributes); + this.parameters = new CollectorParameters(attributes); this.parallelism = parameters.getIntOrDefault(parallelismKey, parallelismValue); - } - - public void resume() { - isRunning.set(true); - } - - public void pause() { - isRunning.set(false); - } - protected void waitUntilRunningOrDropped() { - while (!isRunning.get() && !isDropped.get()) { - LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS); - } + this.dispatch = new TaskDispatch(); } public final synchronized void create() throws Exception { - resume(); + dispatch.resume(); createInternal(); } public abstract void createInternal() throws Exception; public final synchronized void start() throws Exception { - resume(); + dispatch.resume(); startInternal(); } public abstract void startInternal() throws Exception; public final synchronized void stop() throws Exception { - pause(); + dispatch.pause(); stopInternal(); } public abstract void stopInternal() throws Exception; public final synchronized void drop() throws Exception { - pause(); - isDropped.set(true); + dispatch.remove(); dropInternal(); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java new file mode 100644 index 0000000..9a904c0 --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskDispatch.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.runtime.task; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.LockSupport; + +public class TaskDispatch { + + private static final long CHECK_RUNNING_INTERVAL_NANOS = 100_000_000L; + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final AtomicBoolean isDropped = new AtomicBoolean(false); + + public void resume() { + isRunning.set(true); + } + + public void pause() { + isRunning.set(false); + } + + public void remove() { + pause(); + isDropped.set(true); + } + + public boolean isRunning() { + return isRunning.get(); + } + + public void waitUntilRunningOrDropped() { + while (!isRunning.get() && !isDropped.get()) { + LockSupport.parkNanos(CHECK_RUNNING_INTERVAL_NANOS); + } + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java index ec3a4f1..f96a40f 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskRuntime.java @@ -22,6 +22,7 @@ import org.apache.iotdb.collector.runtime.task.processor.ProcessorTask; import org.apache.iotdb.collector.runtime.task.sink.SinkTask; import org.apache.iotdb.collector.runtime.task.source.SourceTask; +import org.apache.iotdb.collector.service.PersistenceService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,7 @@ import javax.ws.rs.core.Response; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; public class TaskRuntime implements AutoCloseable { @@ -39,9 +41,11 @@ public class TaskRuntime implements AutoCloseable { public synchronized Response createTask( final String taskId, + final TaskStateEnum taskState, final Map sourceAttribute, final Map processorAttribute, - final Map sinkAttribute) { + final Map sinkAttribute, + final boolean isRestRequest) { try { if (tasks.containsKey(taskId)) { return Response.status(Response.Status.CONFLICT) @@ -53,14 +57,28 @@ public synchronized Response createTask( final ProcessorTask processorTask = new ProcessorTask(taskId, processorAttribute, sinkTask.makeProducer()); final SourceTask sourceTask = - SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer()); + SourceTask.construct(taskId, sourceAttribute, processorTask.makeProducer(), taskState); final TaskCombiner taskCombiner = new TaskCombiner(sourceTask, processorTask, sinkTask); - tasks.put(taskId, taskCombiner); taskCombiner.create(); + tasks.put(taskId, taskCombiner); + + // storage task info to sqlite + if (isRestRequest) { + PersistenceService.task() + .ifPresent( + taskPersistence -> + taskPersistence.tryPersistenceTask( + taskId, + TaskStateEnum.RUNNING, + sourceAttribute, + processorAttribute, + sinkAttribute)); + } + LOGGER.info("Successfully created task {}", taskId); - return Response.status(Response.Status.CREATED) + return Response.status(Response.Status.OK) .entity(String.format("Successfully created task %s", taskId)) .build(); } catch (final Exception e) { @@ -81,6 +99,10 @@ public synchronized Response startTask(final String taskId) { try { tasks.get(taskId).start(); + PersistenceService.task() + .ifPresent( + taskPersistence -> taskPersistence.tryAlterTaskState(taskId, TaskStateEnum.RUNNING)); + LOGGER.info("Task {} start successfully", taskId); return Response.status(Response.Status.OK) .entity(String.format("task %s start successfully", taskId)) @@ -103,6 +125,10 @@ public synchronized Response stopTask(final String taskId) { try { tasks.get(taskId).stop(); + PersistenceService.task() + .ifPresent( + taskPersistence -> taskPersistence.tryAlterTaskState(taskId, TaskStateEnum.STOPPED)); + LOGGER.info("Task {} stop successfully", taskId); return Response.status(Response.Status.OK) .entity(String.format("task %s stop successfully", taskId)) @@ -116,14 +142,19 @@ public synchronized Response stopTask(final String taskId) { } public synchronized Response dropTask(final String taskId) { - if (!tasks.containsKey(taskId)) { + if (Objects.isNull(tasks.get(taskId)) || !tasks.containsKey(taskId)) { return Response.status(Response.Status.NOT_FOUND) .entity(String.format("task %s not found", taskId)) .build(); } try { - tasks.remove(taskId).drop(); + final TaskCombiner task = tasks.get(taskId); + task.drop(); + tasks.remove(taskId); + + // remove task info from sqlite + PersistenceService.task().ifPresent(taskPersistence -> taskPersistence.tryDeleteTask(taskId)); LOGGER.info("Task {} drop successfully", taskId); return Response.status(Response.Status.OK) diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java new file mode 100644 index 0000000..159016d --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/TaskStateEnum.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.collector.runtime.task; + +public enum TaskStateEnum { + RUNNING(0), + STOPPED(1); + + private final int taskState; + + TaskStateEnum(final int taskState) { + this.taskState = taskState; + } + + public int getTaskState() { + return taskState; + } +} diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java index c153506..9990871 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorConsumer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.collector.runtime.task.processor; +import org.apache.iotdb.collector.runtime.task.TaskDispatch; import org.apache.iotdb.collector.runtime.task.event.EventContainer; import org.apache.iotdb.pipe.api.PipeProcessor; import org.apache.iotdb.pipe.api.collector.EventCollector; @@ -33,6 +34,8 @@ class ProcessorConsumer implements WorkHandler { private final PipeProcessor processor; private final EventCollector eventCollector; + private TaskDispatch dispatch; + ProcessorConsumer(final PipeProcessor processor, final EventCollector eventCollector) { this.processor = processor; this.eventCollector = eventCollector; @@ -44,6 +47,8 @@ PipeProcessor consumer() { @Override public void onEvent(final EventContainer eventContainer) throws Exception { + dispatch.waitUntilRunningOrDropped(); + // TODO: retry strategy final Event event = eventContainer.getEvent(); if (event instanceof TabletInsertionEvent) { @@ -54,4 +59,8 @@ public void onEvent(final EventContainer eventContainer) throws Exception { processor.process(event, eventCollector); } } + + public void setDispatch(final TaskDispatch dispatch) { + this.dispatch = dispatch; + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java index e671ea0..37fbf4e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/processor/ProcessorTask.java @@ -99,6 +99,7 @@ public void createInternal() throws Exception { for (int i = 0; i < parallelism; i++) { processorConsumers[i] = new ProcessorConsumer(pluginRuntime.constructProcessor(parameters), sinkProducer); + processorConsumers[i].setDispatch(dispatch); try { processorConsumers[i].consumer().validate(new PipeParameterValidator(parameters)); processorConsumers[i] @@ -118,16 +119,18 @@ public void createInternal() throws Exception { disruptor.handleEventsWithWorkerPool(processorConsumers); disruptor.setDefaultExceptionHandler(new ProcessorExceptionHandler()); + + disruptor.start(); } @Override public void startInternal() { - disruptor.start(); + // do nothing } @Override public void stopInternal() { - disruptor.halt(); + // do nothing } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java index 2fe13f0..c17691e 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkConsumer.java @@ -19,6 +19,7 @@ package org.apache.iotdb.collector.runtime.task.sink; +import org.apache.iotdb.collector.runtime.task.TaskDispatch; import org.apache.iotdb.collector.runtime.task.event.EventContainer; import org.apache.iotdb.pipe.api.PipeSink; import org.apache.iotdb.pipe.api.event.Event; @@ -31,6 +32,8 @@ class SinkConsumer implements WorkHandler { private final PipeSink sink; + private TaskDispatch dispatch; + SinkConsumer(final PipeSink sink) { this.sink = sink; } @@ -41,6 +44,8 @@ PipeSink consumer() { @Override public void onEvent(EventContainer eventContainer) throws Exception { + dispatch.waitUntilRunningOrDropped(); + // TODO: retry strategy final Event event = eventContainer.getEvent(); if (event instanceof TabletInsertionEvent) { @@ -51,4 +56,8 @@ public void onEvent(EventContainer eventContainer) throws Exception { sink.transfer(event); } } + + public void setDispatch(final TaskDispatch dispatch) { + this.dispatch = dispatch; + } } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java index e3e66db..d540b92 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/sink/SinkTask.java @@ -89,6 +89,7 @@ public void createInternal() throws Exception { consumers = new SinkConsumer[parallelism]; for (int i = 0; i < parallelism; i++) { consumers[i] = new SinkConsumer(pluginRuntime.constructSink(parameters)); + consumers[i].setDispatch(dispatch); try { consumers[i].consumer().validate(new PipeParameterValidator(parameters)); consumers[i] @@ -109,16 +110,18 @@ public void createInternal() throws Exception { disruptor.handleEventsWithWorkerPool(consumers); disruptor.setDefaultExceptionHandler(new SinkExceptionHandler()); + + disruptor.start(); } @Override public void startInternal() { - disruptor.start(); + // do nothing } @Override public void stopInternal() { - disruptor.halt(); + // do nothing } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java index ca0c8c4..2cf5f67 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/SourceTask.java @@ -21,6 +21,7 @@ import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; import org.apache.iotdb.collector.runtime.task.Task; +import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.runtime.task.event.EventCollector; import org.apache.iotdb.collector.runtime.task.source.pull.PullSourceTask; import org.apache.iotdb.collector.runtime.task.source.push.PushSourceTask; @@ -34,20 +35,24 @@ public abstract class SourceTask extends Task { protected final EventCollector processorProducer; + protected TaskStateEnum taskState; protected SourceTask( final String taskId, final Map attributes, - final EventCollector processorProducer) { + final EventCollector processorProducer, + final TaskStateEnum taskState) { super( taskId, attributes, TASK_SOURCE_PARALLELISM_NUM.key(), TASK_SOURCE_PARALLELISM_NUM.value()); this.processorProducer = processorProducer; + this.taskState = taskState; } public static SourceTask construct( final String taskId, final Map attributes, - final EventCollector processorProducer) + final EventCollector processorProducer, + final TaskStateEnum taskState) throws Exception { final PluginRuntime pluginRuntime = RuntimeService.plugin().isPresent() ? RuntimeService.plugin().get() : null; @@ -57,10 +62,10 @@ public static SourceTask construct( final PipeParameters parameters = new PipeParameters(attributes); if (pluginRuntime.isPullSource(parameters)) { - return new PullSourceTask(taskId, attributes, processorProducer); + return new PullSourceTask(taskId, attributes, processorProducer, taskState); } if (pluginRuntime.isPushSource(parameters)) { - return new PushSourceTask(taskId, attributes, processorProducer); + return new PushSourceTask(taskId, attributes, processorProducer, taskState); } throw new IllegalArgumentException("Unsupported source type"); } diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java index b47054a..d426526 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/pull/PullSourceTask.java @@ -22,6 +22,7 @@ import org.apache.iotdb.collector.plugin.api.PullSource; import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.runtime.task.event.EventCollector; import org.apache.iotdb.collector.runtime.task.source.SourceTask; import org.apache.iotdb.collector.service.RuntimeService; @@ -50,8 +51,9 @@ public class PullSourceTask extends SourceTask { public PullSourceTask( final String taskId, final Map attributes, - final EventCollector processorProducer) { - super(taskId, attributes, processorProducer); + final EventCollector processorProducer, + final TaskStateEnum taskState) { + super(taskId, attributes, processorProducer, taskState); } @Override @@ -101,14 +103,14 @@ public void createInternal() throws Exception { .get(taskId) .submit( () -> { - while (!isDropped.get()) { + while (dispatch.isRunning() && TaskStateEnum.RUNNING.equals(taskState)) { try { consumers[finalI].onScheduler(); } catch (final Exception e) { LOGGER.warn("Failed to pull source", e); } - waitUntilRunningOrDropped(); + dispatch.waitUntilRunningOrDropped(); } }); } @@ -116,12 +118,12 @@ public void createInternal() throws Exception { @Override public void startInternal() { - // do nothing + this.taskState = TaskStateEnum.RUNNING; } @Override public void stopInternal() { - // do nothing + this.taskState = TaskStateEnum.STOPPED; } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java index 938f29f..7419ae7 100644 --- a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/runtime/task/source/push/PushSourceTask.java @@ -22,6 +22,7 @@ import org.apache.iotdb.collector.plugin.api.PushSource; import org.apache.iotdb.collector.plugin.api.customizer.CollectorSourceRuntimeConfiguration; import org.apache.iotdb.collector.runtime.plugin.PluginRuntime; +import org.apache.iotdb.collector.runtime.task.TaskStateEnum; import org.apache.iotdb.collector.runtime.task.event.EventCollector; import org.apache.iotdb.collector.runtime.task.source.SourceTask; import org.apache.iotdb.collector.service.RuntimeService; @@ -41,8 +42,9 @@ public class PushSourceTask extends SourceTask { public PushSourceTask( final String taskId, final Map sourceParams, - final EventCollector processorProducer) { - super(taskId, sourceParams, processorProducer); + final EventCollector processorProducer, + final TaskStateEnum taskState) { + super(taskId, sourceParams, processorProducer, taskState); } @Override @@ -64,7 +66,9 @@ public void createInternal() throws Exception { pushSources[i].customize( parameters, new CollectorSourceRuntimeConfiguration(taskId, creationTime, parallelism, i)); - pushSources[i].start(); + if (TaskStateEnum.RUNNING.equals(taskState)) { + pushSources[i].start(); + } } catch (final Exception e) { try { pushSources[i].close(); @@ -78,12 +82,42 @@ public void createInternal() throws Exception { @Override public void startInternal() { - // do nothing + if (this.taskState.equals(TaskStateEnum.RUNNING)) { + return; + } + + if (pushSources != null) { + for (int i = 0; i < parallelism; i++) { + try { + pushSources[i].start(); + } catch (final Exception e) { + LOGGER.warn("Failed to restart push source", e); + return; + } + } + } + + this.taskState = TaskStateEnum.RUNNING; } @Override public void stopInternal() { - // do nothing + if (this.taskState.equals(TaskStateEnum.STOPPED)) { + return; + } + + if (pushSources != null) { + for (int i = 0; i < parallelism; i++) { + try { + pushSources[i].close(); + } catch (final Exception e) { + LOGGER.warn("Failed to stop source", e); + return; + } + } + + this.taskState = TaskStateEnum.STOPPED; + } } @Override diff --git a/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java new file mode 100644 index 0000000..a5eb42e --- /dev/null +++ b/iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/service/PersistenceService.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.collector.service; + +import org.apache.iotdb.collector.persistence.DBConstant; +import org.apache.iotdb.collector.persistence.PluginPersistence; +import org.apache.iotdb.collector.persistence.TaskPersistence; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.iotdb.collector.config.PluginRuntimeOptions.PLUGIN_INSTALL_LIB_DIR; + +public class PersistenceService implements IService { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersistenceService.class); + + private static final AtomicReference PLUGIN = new AtomicReference<>(); + private static final AtomicReference TASK = new AtomicReference<>(); + + @Override + public void start() { + initPluginDir(); + + PLUGIN.set(new PluginPersistence(DBConstant.PLUGIN_DATABASE_URL)); + TASK.set(new TaskPersistence(DBConstant.TASK_DATABASE_URL)); + + PLUGIN.get().tryResume(); + TASK.get().tryResume(); + } + + private void initPluginDir() { + final Path pluginInstallPath = Paths.get(PLUGIN_INSTALL_LIB_DIR.value()); + try { + if (!Files.exists(pluginInstallPath)) { + FileUtils.forceMkdir(pluginInstallPath.toFile()); + } + } catch (final IOException e) { + LOGGER.warn("Failed to create plugin install directory", e); + } + } + + public static Optional plugin() { + return Optional.of(PLUGIN.get()); + } + + public static Optional task() { + return Optional.of(TASK.get()); + } + + @Override + public void stop() { + PLUGIN.set(null); + TASK.set(null); + } + + @Override + public String name() { + return "PersistenceService"; + } +} diff --git a/iotdb-collector/collector-core/src/main/resources/application.properties b/iotdb-collector/collector-core/src/main/resources/application.properties index 244ed55..925ef65 100644 --- a/iotdb-collector/collector-core/src/main/resources/application.properties +++ b/iotdb-collector/collector-core/src/main/resources/application.properties @@ -54,3 +54,17 @@ task_processor_ring_buffer_size=1024 # Effective mode: on every start # Data type: int task_sink_ring_buffer_size=1024 + +#################### +### Plugin Configuration +#################### + +# The location of plugin jar file +# Effective mode: on every start +# Data type: string +plugin_lib_dir=ext/plugin + +# Installation location of plugin jar file +# Effective mode: on every start +# Data type: string +plugin_install_lib_dir=ext/plugin/install \ No newline at end of file diff --git a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml index 25ea5b5..eec0e5f 100644 --- a/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml +++ b/iotdb-collector/collector-openapi/src/main/openapi3/v1/plugin.yaml @@ -41,50 +41,21 @@ paths: "200": $ref: '#/components/responses/SuccessExecutionStatus' - /plugin/v1/alter: - post: - operationId: alterPlugin - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/AlterPluginRequest' - responses: - "200": - $ref: '#/components/responses/SuccessExecutionStatus' - - /plugin/v1/start: - post: - operationId: startPlugin - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/StartPluginRequest' - responses: - "200": - $ref: '#/components/responses/SuccessExecutionStatus' - - /plugin/v1/stop: + /plugin/v1/drop: post: - operationId: stopPlugin + operationId: dropPlugin requestBody: content: application/json: schema: - $ref: '#/components/schemas/StopPluginRequest' + $ref: '#/components/schemas/DropPluginRequest' responses: "200": $ref: '#/components/responses/SuccessExecutionStatus' - /plugin/v1/drop: + /plugin/v1/show: post: - operationId: dropPlugin - requestBody: - content: - application/json: - schema: - $ref: '#/components/schemas/DropPluginRequest' + operationId: showPlugin responses: "200": $ref: '#/components/responses/SuccessExecutionStatus' @@ -93,51 +64,16 @@ components: schemas: CreatePluginRequest: properties: - sourceAttribute: - type: object - additionalProperties: - type: string - processorAttribute: - type: object - additionalProperties: - type: string - sinkAttribute: - type: object - additionalProperties: - type: string - taskId: - type: string - - AlterPluginRequest: - properties: - sourceAttribute: - type: object - additionalProperties: - type: string - processorAttribute: - type: object - additionalProperties: - type: string - sinkAttribute: - type: object - additionalProperties: - type: string - taskId: + pluginName: type: string - - StartPluginRequest: - properties: - taskId: + className: type: string - - StopPluginRequest: - properties: - taskId: + jarName: type: string DropPluginRequest: properties: - taskId: + pluginName: type: string ExecutionStatus: diff --git a/pom.xml b/pom.xml index 4423708..da29b5f 100644 --- a/pom.xml +++ b/pom.xml @@ -170,6 +170,7 @@ 2.7.18 5.3.39 + 3.49.1.0 1.6.14 chmod @@ -861,6 +862,11 @@ disruptor ${disruptor.version} + + org.xerial + sqlite-jdbc + ${sqlite.version} +