diff --git a/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java b/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java index 58fdf4b..04dcf50 100644 --- a/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java +++ b/api/src/main/java/org/tron/common/logsfilter/IPluginEventListener.java @@ -3,28 +3,29 @@ import org.pf4j.ExtensionPoint; public interface IPluginEventListener extends ExtensionPoint { - void setServerAddress(String address); - void setTopic(int eventType, String topic); + void setServerAddress(String address); - void setDBConfig(String dbConfig); + void setTopic(int eventType, String topic); - // start should be called after setServerAddress, setTopic, setDBConfig - void start(); + void setDBConfig(String dbConfig); - void handleBlockEvent(Object data); + // start should be called after setServerAddress, setTopic, setDBConfig + void start(); - void handleTransactionTrigger(Object data); + void handleBlockEvent(Object data); - void handleContractLogTrigger(Object data); + void handleTransactionTrigger(Object data); - void handleContractEventTrigger(Object data); + void handleContractLogTrigger(Object data); - void handleSolidityTrigger(Object trigger); + void handleContractEventTrigger(Object data); - void handleSolidityLogTrigger(Object trigger); + void handleSolidityTrigger(Object trigger); - void handleSolidityEventTrigger(Object trigger); + void handleSolidityLogTrigger(Object trigger); - int getPendingSize(); + void handleSolidityEventTrigger(Object trigger); + + int getPendingSize(); } diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java index 1b9cc9c..75d04f4 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java @@ -8,6 +8,7 @@ import lombok.Setter; public class BlockLogTrigger extends Trigger { + @Getter @Setter private long blockNumber; @@ -24,7 +25,6 @@ public class BlockLogTrigger extends Trigger { @Setter private List transactionList = new ArrayList<>(); - /** * address of witness */ @@ -46,23 +46,14 @@ public BlockLogTrigger() { @Override public String toString() { - return new StringBuilder().append("triggerName: ").append(getTriggerName()) - .append("timestamp: ") - .append(timeStamp) - .append(", blockNumber: ") - .append(blockNumber) - .append(", blockhash: ") - .append(blockHash) - .append(", transactionSize: ") - .append(transactionSize) - .append(", transactionList: ") - .append(transactionList) - .append(", witnessAddress: ") - .append(witnessAddress) - .append(", witnessPayPerBlock: ") - .append(witnessPayPerBlock) - .append(", witnessMap: ") - .append(witnessMap) - .toString(); + return "triggerName: " + getTriggerName() + + ", timestamp: " + timeStamp + + ", blockNumber: " + blockNumber + + ", blockhash: " + blockHash + + ", transactionSize: " + transactionSize + + ", transactionList: " + transactionList + + ", witnessAddress: " + witnessAddress + + ", witnessPayPerBlock: " + witnessPayPerBlock + + ", witnessMap: " + witnessMap; } } diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/ContractEventTrigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/ContractEventTrigger.java index 3f6355c..488a186 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/ContractEventTrigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/ContractEventTrigger.java @@ -37,7 +37,6 @@ public class ContractEventTrigger extends ContractTrigger { @Setter private Map dataMap; - public ContractEventTrigger() { super(); setTriggerName(Trigger.CONTRACTEVENT_TRIGGER_NAME); diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/ContractLogTrigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/ContractLogTrigger.java index 26f6dc3..5f8830b 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/ContractLogTrigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/ContractLogTrigger.java @@ -5,6 +5,7 @@ import lombok.Setter; public class ContractLogTrigger extends ContractTrigger { + /** * topic list produced by the smart contract LOG function */ diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/SolidityTrigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/SolidityTrigger.java index fa64e28..6c4f3ad 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/SolidityTrigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/SolidityTrigger.java @@ -4,17 +4,16 @@ import lombok.Setter; public class SolidityTrigger extends Trigger { + @Getter @Setter private long latestSolidifiedBlockNumber; @Override public String toString() { - return new StringBuilder().append("triggerName: ").append(getTriggerName()) - .append("timestamp: ") - .append(timeStamp) - .append(", latestSolidifiedBlockNumber: ") - .append(latestSolidifiedBlockNumber).toString(); + return "triggerName: " + getTriggerName() + + ", timestamp: " + timeStamp + + ", latestSolidifiedBlockNumber: " + latestSolidifiedBlockNumber; } public SolidityTrigger() { diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java index 2a9624a..355f09e 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java @@ -1,6 +1,5 @@ package org.tron.common.logsfilter.trigger; -import java.util.ArrayList; import java.util.List; import lombok.Getter; import lombok.Setter; diff --git a/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java b/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java index e58f23a..246fd71 100644 --- a/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java +++ b/api/src/main/java/org/tron/common/logsfilter/trigger/Trigger.java @@ -4,6 +4,7 @@ import lombok.Setter; public class Trigger { + @Getter @Setter protected long timeStamp; diff --git a/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java b/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java index 19450c3..cefbba2 100644 --- a/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java +++ b/app/src/main/java/org/tron/eventplugin/app/PluginLauncher.java @@ -13,97 +13,97 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.tron.eventplugin.app; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.util.List; +import java.util.Objects; import org.pf4j.CompoundPluginDescriptorFinder; +import org.pf4j.DefaultPluginManager; import org.pf4j.ManifestPluginDescriptorFinder; +import org.pf4j.PluginManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.pf4j.DefaultPluginManager; -import org.pf4j.PluginManager; import org.tron.common.logsfilter.IPluginEventListener; import org.tron.common.logsfilter.trigger.BlockLogTrigger; import org.tron.common.logsfilter.trigger.Trigger; -import java.io.File; -import java.util.List; -import java.util.Objects; - public class PluginLauncher { - private static final Logger logger = LoggerFactory.getLogger(PluginLauncher.class); - - public static void main(String[] args) { - String path = "/Users/tron/sourcecode/eventplugin/build/plugins/plugin-mongodb-1.0.0.zip"; - - File dir = new File(path); - // create the plugin manager - final PluginManager pluginManager = new DefaultPluginManager(dir.toPath()) { - @Override - protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() { - return new CompoundPluginDescriptorFinder() - .add(new ManifestPluginDescriptorFinder()); - } - }; - - File file = new File(path); - - pluginManager.loadPlugin(file.toPath()); - pluginManager.startPlugins(); - - List eventListeners; - eventListeners = pluginManager.getExtensions(IPluginEventListener.class); - - if (Objects.isNull(eventListeners)) return; - - eventListeners.forEach(listener -> { - listener.setServerAddress("127.0.0.1:27017"); - }); - - eventListeners.forEach(listener -> { - listener.setDBConfig("eventlog|tron|123456"); - }); - - eventListeners.forEach(listener -> { - listener.setTopic(Trigger.BLOCK_TRIGGER, "block"); - listener.setTopic(Trigger.TRANSACTION_TRIGGER, "transaction"); - listener.setTopic(Trigger.CONTRACTEVENT_TRIGGER, "contractevent"); - listener.setTopic(Trigger.CONTRACTLOG_TRIGGER, "contractlog"); - listener.setTopic(Trigger.SOLIDITY_TRIGGER, "solidity"); - listener.setTopic(Trigger.SOLIDITY_EVENT, "solidityevent"); - listener.setTopic(Trigger.SOLIDITY_LOG, "soliditylog"); - - }); - - eventListeners.forEach(listener -> { - listener.start(); - }); - - ObjectMapper objectMapper = new ObjectMapper(); - for (int index = 0; index < 1000; ++index){ - BlockLogTrigger trigger = new BlockLogTrigger(); - trigger.setBlockNumber(index); - trigger.setBlockHash("000000000002f5834df6036318999576bfa23ff1a57e0538fa87d5a90319659e"); - trigger.setTimeStamp(System.currentTimeMillis()); - trigger.setTransactionSize(100); - - eventListeners.forEach(listener -> { - try { - listener.handleBlockEvent(objectMapper.writeValueAsString(trigger)); - } catch (com.fasterxml.jackson.core.JsonProcessingException e) { - e.printStackTrace(); - } - }); - } - while (true){ - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + private static final Logger logger = LoggerFactory.getLogger(PluginLauncher.class); + + public static void main(String[] args) { + String path = "/Users/tron/sourcecode/eventplugin/build/plugins/plugin-mongodb-1.0.0.zip"; + + File dir = new File(path); + // create the plugin manager + final PluginManager pluginManager = new DefaultPluginManager(dir.toPath()) { + @Override + protected CompoundPluginDescriptorFinder createPluginDescriptorFinder() { + return new CompoundPluginDescriptorFinder() + .add(new ManifestPluginDescriptorFinder()); + } + }; + + File file = new File(path); + + pluginManager.loadPlugin(file.toPath()); + pluginManager.startPlugins(); + + List eventListeners; + eventListeners = pluginManager.getExtensions(IPluginEventListener.class); + + if (Objects.isNull(eventListeners)) { + return; + } + + eventListeners.forEach(listener -> { + listener.setServerAddress("127.0.0.1:27017"); + }); + + eventListeners.forEach(listener -> { + listener.setDBConfig("eventlog|tron|123456"); + }); + + eventListeners.forEach(listener -> { + listener.setTopic(Trigger.BLOCK_TRIGGER, "block"); + listener.setTopic(Trigger.TRANSACTION_TRIGGER, "transaction"); + listener.setTopic(Trigger.CONTRACTEVENT_TRIGGER, "contractevent"); + listener.setTopic(Trigger.CONTRACTLOG_TRIGGER, "contractlog"); + listener.setTopic(Trigger.SOLIDITY_TRIGGER, "solidity"); + listener.setTopic(Trigger.SOLIDITY_EVENT, "solidityevent"); + listener.setTopic(Trigger.SOLIDITY_LOG, "soliditylog"); + }); + + eventListeners.forEach(IPluginEventListener::start); + + ObjectMapper objectMapper = new ObjectMapper(); + for (int index = 0; index < 1000; ++index) { + BlockLogTrigger trigger = new BlockLogTrigger(); + trigger.setBlockNumber(index); + trigger.setBlockHash("000000000002f5834df6036318999576bfa23ff1a57e0538fa87d5a90319659e"); + trigger.setTimeStamp(System.currentTimeMillis()); + trigger.setTransactionSize(100); + + eventListeners.forEach(listener -> { + try { + listener.handleBlockEvent(objectMapper.writeValueAsString(trigger)); + } catch (com.fasterxml.jackson.core.JsonProcessingException e) { + e.printStackTrace(); } + }); + } - //pluginManager.stopPlugins(); + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } + + //pluginManager.stopPlugins(); + } } diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java index 0ff4ac9..068b767 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/Constant.java @@ -1,21 +1,23 @@ package org.tron.eventplugin; public class Constant { - public static final int BLOCK_TRIGGER = 0; - public static final int TRANSACTION_TRIGGER = 1; - public static final int CONTRACTLOG_TRIGGER = 2; - public static final int CONTRACTEVENT_TRIGGER = 3; - public static final int SOLIDITY_TRIGGER = 4; - public static final int SOLIDITY_EVENT = 5; - public static final int SOLIDITY_LOG = 6; - public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; - public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; - public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; - public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; - public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; - public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; - public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; + public static final int BLOCK_TRIGGER = 0; + public static final int TRANSACTION_TRIGGER = 1; + public static final int CONTRACTLOG_TRIGGER = 2; + public static final int CONTRACTEVENT_TRIGGER = 3; + public static final int SOLIDITY_TRIGGER = 4; + public static final int SOLIDITY_EVENT = 5; + public static final int SOLIDITY_LOG = 6; - private Constant(){} + public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; + public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; + public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; + public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; + public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; + public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; + public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; + + private Constant() { + } } diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java index 65f2ba2..ceb2087 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaEventListener.java @@ -1,108 +1,98 @@ package org.tron.eventplugin; +import java.util.Objects; import org.pf4j.Extension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tron.common.logsfilter.IPluginEventListener; -import java.util.Objects; @Extension public class KafkaEventListener implements IPluginEventListener { - private static final Logger log = LoggerFactory.getLogger(KafkaEventListener.class); - - @Override - public void setServerAddress(String address) { - - if (Objects.isNull(address) || address.length() == 0){ - return; - } - - MessageSenderImpl.getInstance().setServerAddress(address); - } - - @Override - public void setTopic(int eventType, String topic) { - MessageSenderImpl.getInstance().setTopic(eventType, topic); - } - - @Override - public void setDBConfig(String dbConfig) { - // empty implementation - } - - @Override - public void start() { - // MessageSenderImpl should never init until server address is set - MessageSenderImpl.getInstance().init(); - } - - @Override - public void handleBlockEvent(Object data) { - - if (Objects.isNull(data)){ - return; - } + private static final Logger log = LoggerFactory.getLogger(KafkaEventListener.class); - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void setServerAddress(String address) { + if (Objects.isNull(address) || address.isEmpty()) { + return; } - - @Override - public void handleTransactionTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + MessageSenderImpl.getInstance().setServerAddress(address); + } + + @Override + public void setTopic(int eventType, String topic) { + MessageSenderImpl.getInstance().setTopic(eventType, topic); + } + + @Override + public void setDBConfig(String dbConfig) { + // empty implementation + } + + @Override + public void start() { + // MessageSenderImpl should never init until server address is set + MessageSenderImpl.getInstance().init(); + } + + @Override + public void handleBlockEvent(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleSolidityTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleTransactionTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleSolidityLogTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleSolidityTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleSolidityEventTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public int getPendingSize() { - return MessageSenderImpl.getInstance().getTriggerQueue().size(); + @Override + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data)) { + return; } - - @Override - public void handleContractLogTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public int getPendingSize() { + return MessageSenderImpl.getInstance().getTriggerQueue().size(); + } + + @Override + public void handleContractLogTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleContractEventTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleContractEventTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MessageSenderImpl.getInstance().getTriggerQueue().offer(data); + } } diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaLogFilterPlugin.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaLogFilterPlugin.java index 58fee3c..842d7df 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaLogFilterPlugin.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaLogFilterPlugin.java @@ -5,16 +5,16 @@ public class KafkaLogFilterPlugin extends Plugin { - public KafkaLogFilterPlugin(PluginWrapper wrapper) { - super(wrapper); - } + public KafkaLogFilterPlugin(PluginWrapper wrapper) { + super(wrapper); + } - @Override - public void start() { - } + @Override + public void start() { + } - @Override - public void stop() { - MessageSenderImpl.getInstance().close(); - } + @Override + public void stop() { + MessageSenderImpl.getInstance().close(); + } } diff --git a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java index 31d758b..02645ea 100644 --- a/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java +++ b/plugins/kafkaplugin/src/main/java/org/tron/eventplugin/MessageSenderImpl.java @@ -1,247 +1,239 @@ package org.tron.eventplugin; -import org.apache.kafka.clients.producer.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import lombok.Getter; +import lombok.Setter; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class MessageSenderImpl{ - private static MessageSenderImpl instance = null; - private static final Logger log = LoggerFactory.getLogger(MessageSenderImpl.class); - - private String serverAddress = ""; - private boolean loaded = false; +public class MessageSenderImpl { - private Map producerMap = new HashMap<>(); + private static MessageSenderImpl instance = null; + private static final Logger log = LoggerFactory.getLogger(MessageSenderImpl.class); - private BlockingQueue triggerQueue = new LinkedBlockingQueue(); + @Setter + private String serverAddress = ""; + private boolean loaded = false; - private String blockTopic = ""; - private String transactionTopic = ""; - private String contractEventTopic = ""; - private String contractLogTopic = ""; - private String solidityTopic = ""; - private String solidityLogTopic = ""; - private String solidityEventTopic = ""; + private final Map> producerMap = new HashMap<>(); + @Getter + private BlockingQueue triggerQueue = new LinkedBlockingQueue<>(); - private Thread triggerProcessThread; - private boolean isRunTriggerProcessThread = true; + private String blockTopic = ""; + private String transactionTopic = ""; + private String contractEventTopic = ""; + private String contractLogTopic = ""; + private String solidityTopic = ""; + private String solidityLogTopic = ""; + private String solidityEventTopic = ""; + private Thread triggerProcessThread; + private boolean isRunTriggerProcessThread = true; - public static MessageSenderImpl getInstance(){ + public static MessageSenderImpl getInstance() { + if (Objects.isNull(instance)) { + synchronized (MessageSenderImpl.class) { if (Objects.isNull(instance)) { - synchronized (MessageSenderImpl.class){ - if (Objects.isNull(instance)){ - instance = new MessageSenderImpl(); - } - } - } - - return instance; - } - - public void setServerAddress(String address){ - this.serverAddress = address; - } - - public void init(){ - - if (loaded){ - return; + instance = new MessageSenderImpl(); } - - createProducer(Constant.BLOCK_TRIGGER); - createProducer(Constant.TRANSACTION_TRIGGER); - createProducer(Constant.CONTRACTLOG_TRIGGER); - createProducer(Constant.CONTRACTEVENT_TRIGGER); - createProducer(Constant.SOLIDITY_TRIGGER); - createProducer(Constant.SOLIDITY_EVENT); - createProducer(Constant.SOLIDITY_LOG); - - triggerProcessThread = new Thread(triggerProcessLoop); - triggerProcessThread.start(); - - loaded = true; - } - - public void setTopic(int triggerType, String topic) { - if (triggerType == Constant.BLOCK_TRIGGER) { - blockTopic = topic; - } else if (triggerType == Constant.TRANSACTION_TRIGGER) { - transactionTopic = topic; - } else if (triggerType == Constant.CONTRACTEVENT_TRIGGER) { - contractEventTopic = topic; - } else if (triggerType == Constant.CONTRACTLOG_TRIGGER) { - contractLogTopic = topic; - } else if (triggerType == Constant.SOLIDITY_TRIGGER) { - solidityTopic = topic; - } else if (triggerType == Constant.SOLIDITY_EVENT) { - solidityEventTopic = topic; - } else if (triggerType == Constant.SOLIDITY_LOG) { - solidityLogTopic = topic; + } + } + return instance; + } + + public void init() { + if (loaded) { + return; + } + + createProducer(Constant.BLOCK_TRIGGER); + createProducer(Constant.TRANSACTION_TRIGGER); + createProducer(Constant.CONTRACTLOG_TRIGGER); + createProducer(Constant.CONTRACTEVENT_TRIGGER); + createProducer(Constant.SOLIDITY_TRIGGER); + createProducer(Constant.SOLIDITY_EVENT); + createProducer(Constant.SOLIDITY_LOG); + + triggerProcessThread = new Thread(triggerProcessLoop); + triggerProcessThread.start(); + loaded = true; + } + + public void setTopic(int triggerType, String topic) { + if (triggerType == Constant.BLOCK_TRIGGER) { + blockTopic = topic; + } else if (triggerType == Constant.TRANSACTION_TRIGGER) { + transactionTopic = topic; + } else if (triggerType == Constant.CONTRACTEVENT_TRIGGER) { + contractEventTopic = topic; + } else if (triggerType == Constant.CONTRACTLOG_TRIGGER) { + contractLogTopic = topic; + } else if (triggerType == Constant.SOLIDITY_TRIGGER) { + solidityTopic = topic; + } else if (triggerType == Constant.SOLIDITY_EVENT) { + solidityEventTopic = topic; + } else if (triggerType == Constant.SOLIDITY_LOG) { + solidityLogTopic = topic; + } + } + + private KafkaProducer createProducer(int eventType) { + KafkaProducer producer; + + Thread currentThread = Thread.currentThread(); + ClassLoader savedClassLoader = currentThread.getContextClassLoader(); + + currentThread.setContextClassLoader(null); + + Properties props = new Properties(); + props.put("acks", "all"); + props.put("retries", 0); + props.put("linger.ms", 1); + props.put("bootstrap.servers", this.serverAddress); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + producer = new KafkaProducer(props); + + producerMap.put(eventType, producer); + + currentThread.setContextClassLoader(savedClassLoader); + + return producer; + } + + private void printTimestamp(String data) { + Date date = new Date(); + SimpleDateFormat ft = new SimpleDateFormat("hh:mm:ss:SSS"); + System.out.println(ft.format(date) + ": " + data); + } + + public void sendKafkaRecord(int eventType, String kafkaTopic, Object data) { + KafkaProducer producer = producerMap.get(eventType); + if (Objects.isNull(producer)) { + return; + } + + ProducerRecord record = new ProducerRecord(kafkaTopic, data); + try { + producer.send(record, new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + log.debug("sendKafkaRecord successfully"); } + }); + } catch (Exception e) { + log.error("sendKafkaRecord failed", e); } - private KafkaProducer createProducer(int eventType){ - - KafkaProducer producer = null; - - Thread currentThread = Thread.currentThread(); - ClassLoader savedClassLoader = currentThread.getContextClassLoader(); - - currentThread.setContextClassLoader(null); - - Properties props = new Properties(); - props.put("acks", "all"); - props.put("retries", 0); - props.put("linger.ms", 1); - props.put("bootstrap.servers", this.serverAddress); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - producer = new KafkaProducer(props); - - producerMap.put(eventType, producer); + printTimestamp((String) data); + } - currentThread.setContextClassLoader(savedClassLoader); - - return producer; + public void close() { + for (Map.Entry> entry : producerMap.entrySet()) { + entry.getValue().close(); } + producerMap.clear(); + } - private void printTimestamp(String data){ - Date date = new Date(); - SimpleDateFormat ft = new SimpleDateFormat("hh:mm:ss:SSS"); - System.out.println(ft.format(date) + ": " + data); + public void handleBlockEvent(Object data) { + if (blockTopic == null || blockTopic.isEmpty()) { + return; } + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.BLOCK_TRIGGER, blockTopic, data); + } - public void sendKafkaRecord(int eventType, String kafkaTopic, Object data){ - KafkaProducer producer = producerMap.get(eventType); - if (Objects.isNull(producer)){ - return; - } - - ProducerRecord record = new ProducerRecord(kafkaTopic, data); - try { - producer.send(record, new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - log.debug("sendKafkaRecord successfully"); - } - }); - } catch (Exception e) { - log.error("sendKafkaRecord {}", e); - } - - printTimestamp((String)data); + public void handleTransactionTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(transactionTopic)) { + return; } + MessageSenderImpl.getInstance() + .sendKafkaRecord(Constant.TRANSACTION_TRIGGER, transactionTopic, data); + } - public void close() { - for (Map.Entry entry: producerMap.entrySet()){ - entry.getValue().close(); - } - - producerMap.clear(); + public void handleContractLogTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(contractLogTopic)) { + return; } + MessageSenderImpl.getInstance() + .sendKafkaRecord(Constant.CONTRACTLOG_TRIGGER, contractLogTopic, data); + } - public BlockingQueue getTriggerQueue(){ - return triggerQueue; + public void handleContractEventTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(contractEventTopic)) { + return; } + MessageSenderImpl.getInstance() + .sendKafkaRecord(Constant.CONTRACTEVENT_TRIGGER, contractEventTopic, data); + } - public void handleBlockEvent(Object data) { - if (blockTopic == null || blockTopic.length() == 0){ - return; - } - - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.BLOCK_TRIGGER, blockTopic, data); + public void handleSolidityTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityTopic)) { + return; } + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_TRIGGER, solidityTopic, data); + } - public void handleTransactionTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(transactionTopic)){ - return; - } - - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.TRANSACTION_TRIGGER, transactionTopic, data); + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityLogTopic)) { + return; } + MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_LOG, solidityLogTopic, data); + } - public void handleContractLogTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(contractLogTopic)){ - return; - } - - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.CONTRACTLOG_TRIGGER, contractLogTopic, data); + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data) || Objects.isNull(solidityEventTopic)) { + return; } + MessageSenderImpl.getInstance() + .sendKafkaRecord(Constant.SOLIDITY_EVENT, solidityEventTopic, data); + } - public void handleContractEventTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(contractEventTopic)){ - return; - } - - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.CONTRACTEVENT_TRIGGER, contractEventTopic, data); - } + private final Runnable triggerProcessLoop = + () -> { + while (isRunTriggerProcessThread) { + try { + String triggerData = (String) triggerQueue.poll(1, TimeUnit.SECONDS); + if (Objects.isNull(triggerData)) { + continue; + } - public void handleSolidityTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(solidityTopic)){ - return; - } - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_TRIGGER, solidityTopic, data); - } - public void handleSolidityLogTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(solidityLogTopic)){ - return; - } - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_LOG, solidityLogTopic, data); - } - public void handleSolidityEventTrigger(Object data) { - if (Objects.isNull(data) || Objects.isNull(solidityEventTopic)){ - return; + if (triggerData.contains(Constant.BLOCK_TRIGGER_NAME)) { + handleBlockEvent(triggerData); + } else if (triggerData.contains(Constant.TRANSACTION_TRIGGER_NAME)) { + handleTransactionTrigger(triggerData); + } else if (triggerData.contains(Constant.CONTRACTLOG_TRIGGER_NAME)) { + handleContractLogTrigger(triggerData); + } else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)) { + handleContractEventTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITY_TRIGGER_NAME)) { + handleSolidityTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITYLOG_TRIGGER_NAME)) { + handleSolidityLogTrigger(triggerData); + } else if (triggerData.contains(Constant.SOLIDITYEVENT_TRIGGER_NAME)) { + handleSolidityEventTrigger(triggerData); + } + } catch (InterruptedException ex) { + log.info(ex.getMessage()); + Thread.currentThread().interrupt(); + } catch (Exception ex) { + log.error("unknown exception happened in process capsule loop", ex); + } catch (Throwable throwable) { + log.error("unknown throwable happened in process capsule loop", throwable); + } } - MessageSenderImpl.getInstance().sendKafkaRecord(Constant.SOLIDITY_EVENT, solidityEventTopic, data); - } - - private Runnable triggerProcessLoop = - () -> { - while (isRunTriggerProcessThread) { - try { - String triggerData = (String)triggerQueue.poll(1, TimeUnit.SECONDS); - - if (Objects.isNull(triggerData)){ - continue; - } - - if (triggerData.contains(Constant.BLOCK_TRIGGER_NAME)){ - handleBlockEvent(triggerData); - } - else if (triggerData.contains(Constant.TRANSACTION_TRIGGER_NAME)){ - handleTransactionTrigger(triggerData); - } - else if (triggerData.contains(Constant.CONTRACTLOG_TRIGGER_NAME)){ - handleContractLogTrigger(triggerData); - } - else if (triggerData.contains(Constant.CONTRACTEVENT_TRIGGER_NAME)){ - handleContractEventTrigger(triggerData); - } - else if (triggerData.contains(Constant.SOLIDITY_TRIGGER_NAME)) { - handleSolidityTrigger(triggerData); - } - else if (triggerData.contains(Constant.SOLIDITYLOG_TRIGGER_NAME)) { - handleSolidityLogTrigger(triggerData); - } - else if (triggerData.contains(Constant.SOLIDITYEVENT_TRIGGER_NAME)) { - handleSolidityEventTrigger(triggerData); - } - } catch (InterruptedException ex) { - log.info(ex.getMessage()); - Thread.currentThread().interrupt(); - } catch (Exception ex) { - log.error("unknown exception happened in process capsule loop", ex); - } catch (Throwable throwable) { - log.error("unknown throwable happened in process capsule loop", throwable); - } - } - }; + }; } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java index d20528a..283fe67 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/Constant.java @@ -1,21 +1,23 @@ package org.tron.eventplugin; public class Constant { - public static final int BLOCK_TRIGGER = 0; - public static final int TRANSACTION_TRIGGER = 1; - public static final int CONTRACTLOG_TRIGGER = 2; - public static final int CONTRACTEVENT_TRIGGER = 3; - public static final int SOLIDITY_TRIGGER = 4; - public static final int SOLIDITY_EVENT_TRIGGER = 5; - public static final int SOLIDITY_LOG_TRIGGER = 6; - public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; - public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; - public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; - public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; - public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; - public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; - public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; + public static final int BLOCK_TRIGGER = 0; + public static final int TRANSACTION_TRIGGER = 1; + public static final int CONTRACTLOG_TRIGGER = 2; + public static final int CONTRACTEVENT_TRIGGER = 3; + public static final int SOLIDITY_TRIGGER = 4; + public static final int SOLIDITY_EVENT_TRIGGER = 5; + public static final int SOLIDITY_LOG_TRIGGER = 6; - private Constant(){} + public static final String BLOCK_TRIGGER_NAME = "blockTrigger"; + public static final String TRANSACTION_TRIGGER_NAME = "transactionTrigger"; + public static final String CONTRACTLOG_TRIGGER_NAME = "contractLogTrigger"; + public static final String CONTRACTEVENT_TRIGGER_NAME = "contractEventTrigger"; + public static final String SOLIDITY_TRIGGER_NAME = "solidityTrigger"; + public static final String SOLIDITYLOG_TRIGGER_NAME = "solidityLogTrigger"; + public static final String SOLIDITYEVENT_TRIGGER_NAME = "solidityEventTrigger"; + + private Constant() { + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java index 444cc84..766c9d6 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbEventListener.java @@ -1,109 +1,96 @@ package org.tron.eventplugin; +import java.util.Objects; import org.pf4j.Extension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.tron.common.logsfilter.IPluginEventListener; -import java.util.Objects; @Extension public class MongodbEventListener implements IPluginEventListener { - private static final Logger log = LoggerFactory.getLogger(MongodbEventListener.class); - - @Override - public void setServerAddress(String address) { - - if (Objects.isNull(address) || address.length() == 0){ - return; - } - - MongodbSenderImpl.getInstance().setServerAddress(address); + @Override + public void setServerAddress(String address) { + if (Objects.isNull(address) || address.isEmpty()) { + return; } - - @Override - public void setTopic(int eventType, String topic) { - MongodbSenderImpl.getInstance().setTopic(eventType, topic); + MongodbSenderImpl.getInstance().setServerAddress(address); + } + + @Override + public void setTopic(int eventType, String topic) { + MongodbSenderImpl.getInstance().setTopic(eventType, topic); + } + + @Override + public void setDBConfig(String dbConfig) { + MongodbSenderImpl.getInstance().setDBConfig(dbConfig); + } + + @Override + public void start() { + // MessageSenderImpl should never init until server address is set + MongodbSenderImpl.getInstance().init(); + } + + @Override + public void handleBlockEvent(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void setDBConfig(String dbConfig) { - MongodbSenderImpl.getInstance().setDBConfig(dbConfig); + @Override + public void handleTransactionTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void start() { - // MessageSenderImpl should never init until server address is set - MongodbSenderImpl.getInstance().init(); + @Override + public void handleContractLogTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleBlockEvent(Object data) { - - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); - } - - @Override - public void handleTransactionTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleContractEventTrigger(Object data) { + if (Objects.isNull(data)) { + return; } - @Override - public void handleContractLogTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleSolidityTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public void handleContractEventTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); - } - - @Override - public void handleSolidityTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); - } - - @Override - public void handleSolidityLogTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); - } - - @Override - public void handleSolidityEventTrigger(Object data) { - if (Objects.isNull(data)){ - return; - } - - MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + @Override + public void handleSolidityLogTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } - @Override - public int getPendingSize() { - return MongodbSenderImpl.getInstance().getTriggerQueue().size() - + MongodbSenderImpl.getInstance().getQueue().size(); + @Override + public void handleSolidityEventTrigger(Object data) { + if (Objects.isNull(data)) { + return; } + MongodbSenderImpl.getInstance().getTriggerQueue().offer(data); + } + + @Override + public int getPendingSize() { + return MongodbSenderImpl.getInstance().getTriggerQueue().size() + + MongodbSenderImpl.getInstance().getQueue().size(); + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbLogFilterPlugin.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbLogFilterPlugin.java index a9c748e..2e21fa6 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbLogFilterPlugin.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbLogFilterPlugin.java @@ -5,16 +5,16 @@ public class MongodbLogFilterPlugin extends Plugin { - public MongodbLogFilterPlugin(PluginWrapper wrapper) { - super(wrapper); - } + public MongodbLogFilterPlugin(PluginWrapper wrapper) { + super(wrapper); + } - @Override - public void start() { - } + @Override + public void start() { + } - @Override - public void stop() { - MongodbSenderImpl.getInstance().close(); - } + @Override + public void stop() { + MongodbSenderImpl.getInstance().close(); + } } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java index 3db8b0a..7101c1b 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java @@ -2,38 +2,34 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Properties; -import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.pf4j.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.tron.mongodb.MongoConfig; import org.tron.mongodb.MongoManager; import org.tron.mongodb.MongoTemplate; +@Slf4j public class MongodbSenderImpl { private static MongodbSenderImpl instance = null; - private static final Logger log = LoggerFactory.getLogger(MongodbSenderImpl.class); @Getter BlockingQueue queue = new LinkedBlockingQueue(); - private ExecutorService service = new ThreadPoolExecutor(8, 8, - 0L, TimeUnit.MILLISECONDS, queue); + private final ExecutorService service = new ThreadPoolExecutor(8, 8, + 0L, TimeUnit.MILLISECONDS, queue); private boolean loaded = false; + @Getter private BlockingQueue triggerQueue = new LinkedBlockingQueue<>(); private String blockTopic = ""; @@ -65,7 +61,6 @@ public static MongodbSenderImpl getInstance() { } } } - return instance; } @@ -85,7 +80,7 @@ public void setDBConfig(String dbConfig) { version = 1; if (params.length == 4) { - version = Integer.valueOf(params[3]); + version = Integer.parseInt(params[3]); } loadMongoConfig(); @@ -102,13 +97,13 @@ public void setServerAddress(final String serverAddress) { } String mongoHostName = ""; - int mongoPort = -1; + int mongoPort; try { mongoHostName = params[0]; - mongoPort = Integer.valueOf(params[1]); + mongoPort = Integer.parseInt(params[1]); } catch (Exception e) { - e.printStackTrace(); + log.error("SetServerAddress failed", e); return; } @@ -121,7 +116,6 @@ public void setServerAddress(final String serverAddress) { } public void init() { - if (loaded) { return; } @@ -136,7 +130,6 @@ public void init() { triggerProcessThread = new Thread(triggerProcessLoop); triggerProcessThread.start(); - loaded = true; } @@ -212,10 +205,8 @@ private void loadMongoConfig() { mongoConfig.setConnectionsPerHost(connectionsPerHost); mongoConfig.setThreadsAllowedToBlockForConnectionMultiplier( threadsAllowedToBlockForConnectionMultiplie); - } catch (IOException e) { - e.printStackTrace(); } catch (Exception e) { - e.printStackTrace(); + log.error("LoadMongoConfig failed", e); } } @@ -267,10 +258,6 @@ public void setTopic(int triggerType, String topic) { public void close() { } - public BlockingQueue getTriggerQueue() { - return triggerQueue; - } - public void upsertEntityLong(MongoTemplate template, Object data, String indexKey) { String dataStr = (String) data; try { @@ -302,20 +289,17 @@ public void upsertEntityString(MongoTemplate template, Object data, String index } public void handleBlockEvent(Object data) { - if (blockTopic == null || blockTopic.length() == 0) { + if (blockTopic == null || blockTopic.isEmpty()) { return; } MongoTemplate template = mongoTemplateMap.get(blockTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - if (mongoConfig.enabledIndexes()) { - upsertEntityLong(template, data, "blockNumber"); - } else { - template.addEntity((String) data); - } + service.execute(() -> { + if (mongoConfig.enabledIndexes()) { + upsertEntityLong(template, data, "blockNumber"); + } else { + template.addEntity((String) data); } }); } @@ -328,14 +312,11 @@ public void handleTransactionTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(transactionTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - if (mongoConfig.enabledIndexes()) { - upsertEntityString(template, data, "transactionId"); - } else { - template.addEntity((String) data); - } + service.execute(() -> { + if (mongoConfig.enabledIndexes()) { + upsertEntityString(template, data, "transactionId"); + } else { + template.addEntity((String) data); } }); } @@ -348,14 +329,11 @@ public void handleSolidityTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(solidityTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - if (mongoConfig.enabledIndexes()) { - upsertEntityLong(template, data, "latestSolidifiedBlockNumber"); - } else { - template.addEntity((String) data); - } + service.execute(() -> { + if (mongoConfig.enabledIndexes()) { + upsertEntityLong(template, data, "latestSolidifiedBlockNumber"); + } else { + template.addEntity((String) data); } }); } @@ -377,12 +355,7 @@ public void handleContractLogTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(contractLogTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - handleInsertContractTrigger(template, data, "uniqueId"); - } - }); + service.execute(() -> handleInsertContractTrigger(template, data, "uniqueId")); } } @@ -393,23 +366,20 @@ public void handleContractEventTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(contractEventTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - String dataStr = (String) data; - if (dataStr.contains("\"removed\":true")) { - try { - JSONObject jsStr = JSON.parseObject(dataStr); - String uniqueId = jsStr.getString("uniqueId"); - if (uniqueId != null) { - template.delete("uniqueId", uniqueId); - } - } catch (Exception ex) { - log.error("unknown exception happened in parse object ", ex); + service.execute(() -> { + String dataStr = (String) data; + if (dataStr.contains("\"removed\":true")) { + try { + JSONObject jsStr = JSON.parseObject(dataStr); + String uniqueId = jsStr.getString("uniqueId"); + if (uniqueId != null) { + template.delete("uniqueId", uniqueId); } - } else { - handleInsertContractTrigger(template, data, "uniqueId"); + } catch (Exception ex) { + log.error("unknown exception happened in parse object ", ex); } + } else { + handleInsertContractTrigger(template, data, "uniqueId"); } }); } @@ -422,12 +392,7 @@ public void handleSolidityLogTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(solidityLogTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - handleInsertContractTrigger(template, data, "uniqueId"); - } - }); + service.execute(() -> handleInsertContractTrigger(template, data, "uniqueId")); } } @@ -438,16 +403,11 @@ public void handleSolidityEventTrigger(Object data) { MongoTemplate template = mongoTemplateMap.get(solidityEventTopic); if (Objects.nonNull(template)) { - service.execute(new Runnable() { - @Override - public void run() { - handleInsertContractTrigger(template, data, "uniqueId"); - } - }); + service.execute(() -> handleInsertContractTrigger(template, data, "uniqueId")); } } - private Runnable triggerProcessLoop = + private final Runnable triggerProcessLoop = () -> { while (isRunTriggerProcessThread) { try { @@ -471,7 +431,11 @@ public void run() { handleSolidityLogTrigger(triggerData); } else if (triggerData.contains(Constant.SOLIDITYEVENT_TRIGGER_NAME)) { handleSolidityEventTrigger(triggerData); + } else { + log.error("not matched triggerData: {}", triggerData); + continue; } + log.debug("handle triggerData: {}", triggerData); } catch (InterruptedException ex) { log.info(ex.getMessage()); Thread.currentThread().interrupt(); diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/BaseEntity.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/BaseEntity.java index 53e92f7..a5e134a 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/BaseEntity.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/BaseEntity.java @@ -1,17 +1,12 @@ package org.tron.mongodb; import java.io.Serializable; +import lombok.Getter; +import lombok.Setter; public class BaseEntity implements Serializable { - private Object _id; - - public Object get_id() { - return _id; - } - - public void set_id(Object _id) { - this._id = _id; - } - + @Setter + @Getter + private Object _id; } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java index 78d9e98..bb3f1f5 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoConfig.java @@ -1,5 +1,10 @@ package org.tron.mongodb; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter public class MongoConfig { private String host; @@ -11,74 +16,7 @@ public class MongoConfig { private int connectionsPerHost = 10; private int threadsAllowedToBlockForConnectionMultiplier = 10; - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getDbName() { - return dbName; - } - - public void setDbName(String dbName) { - this.dbName = dbName; - } - - public String getUsername() { - return username; - } - - public void setUsername(String username) { - this.username = username; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - public int getConnectionsPerHost() { - return connectionsPerHost; - } - - public void setConnectionsPerHost(int connectionsPerHost) { - this.connectionsPerHost = connectionsPerHost; - } - - public int getThreadsAllowedToBlockForConnectionMultiplier() { - return threadsAllowedToBlockForConnectionMultiplier; - } - - public void setThreadsAllowedToBlockForConnectionMultiplier( - int threadsAllowedToBlockForConnectionMultiplier) { - this.threadsAllowedToBlockForConnectionMultiplier = - threadsAllowedToBlockForConnectionMultiplier; - } - public boolean enabledIndexes() { return version == 2; } - } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java index 2057a0d..9843358 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoManager.java @@ -11,13 +11,19 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.pf4j.util.StringUtils; @Slf4j public class MongoManager { + @Setter + @Getter private MongoClient mongo; + @Setter + @Getter private MongoDatabase db; public void initConfig(MongoConfig config) { @@ -31,7 +37,7 @@ public void initConfig(MongoConfig config) { String host = config.getHost(); int port = config.getPort(); ServerAddress serverAddress = new ServerAddress(host, port); - List addrs = new ArrayList(); + List addrs = new ArrayList<>(); addrs.add(serverAddress); String username = config.getUsername(); @@ -44,8 +50,8 @@ public void initConfig(MongoConfig config) { MongoCredential credential = MongoCredential.createScramSha1Credential(username, databaseName, password.toCharArray()); - List credentials = new ArrayList(); - credentials.add(credential); + //List credentials = new ArrayList<>(); + //credentials.add(credential); mongo = new MongoClient(addrs, credential, options); db = mongo.getDatabase(databaseName); @@ -53,7 +59,7 @@ public void initConfig(MongoConfig config) { public void createCollection(String collectionName) { if (db != null && StringUtils.isNotNullOrEmpty(collectionName)) { - if (Objects.isNull(db.getCollection(collectionName))){ + if (Objects.isNull(db.getCollection(collectionName))) { db.createCollection(collectionName); } } @@ -83,21 +89,4 @@ public void createCollection(String collectionName, Map indexOp } } } - - public MongoClient getMongo() { - return mongo; - } - - public void setMongo(MongoClient mongo) { - this.mongo = mongo; - } - - public MongoDatabase getDb() { - return db; - } - - public void setDb(MongoDatabase db) { - this.db = db; - } - } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java index 4666efc..0722359 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/MongoTemplate.java @@ -1,24 +1,29 @@ package org.tron.mongodb; -import com.mongodb.client.model.ReplaceOptions; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import org.bson.Document; -import org.bson.conversions.Bson; import com.mongodb.BasicDBObject; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Filters; +import com.mongodb.client.model.ReplaceOptions; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.UpdateResult; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.bson.Document; +import org.bson.conversions.Bson; import org.tron.mongodb.util.Converter; import org.tron.mongodb.util.Pager; +@Slf4j public abstract class MongoTemplate { + @Setter + @Getter private MongoManager manager; private MongoCollection collection = null; @@ -152,18 +157,10 @@ public Pager queryPagerList(Bson filter, int pageIndex, int pageSize) { return pager; } - public MongoManager getManager() { - return manager; - } - - public void setManager(MongoManager manager) { - this.manager = manager; - } - private List getEntityList(FindIterable findIterable) { MongoCursor mongoCursor = findIterable.iterator(); List list = new ArrayList(); - Document document = null; + Document document; while (mongoCursor.hasNext()) { document = mongoCursor.next(); T object; @@ -171,7 +168,7 @@ private List getEntityList(FindIterable findIterable) { object = Converter.jsonStringToObject(document.toJson(), getReferencedClass()); list.add(object); } catch (Exception e) { - e.printStackTrace(); + log.error("getEntityList failed", e); } } return list; @@ -181,12 +178,9 @@ private MongoCollection getCollection() { if (Objects.isNull(manager) || Objects.isNull(manager.getDb())) { return null; } - if (Objects.isNull(collection)) { collection = manager.getDb().getCollection(collectionName()); } - return collection; } - } diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Converter.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Converter.java index 2f62c29..669fc48 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Converter.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Converter.java @@ -1,23 +1,21 @@ -package org.tron.mongodb.util; - -import java.io.Serializable; - -import org.bson.Document; - -import com.alibaba.fastjson.JSON; - -public class Converter { - - public static Document jsonStringToDocument(String jsonString) { - return Document.parse(jsonString); - } - - public static String objectToJsonString(Serializable entity) { - return JSON.toJSONString(entity); - } - - public static T jsonStringToObject(String jsonString, Class clazz) { - return JSON.parseObject(jsonString, clazz); - } - -} +package org.tron.mongodb.util; + +import com.alibaba.fastjson.JSON; +import java.io.Serializable; +import org.bson.Document; + +public class Converter { + + public static Document jsonStringToDocument(String jsonString) { + return Document.parse(jsonString); + } + + public static String objectToJsonString(Serializable entity) { + return JSON.toJSONString(entity); + } + + public static T jsonStringToObject(String jsonString, Class clazz) { + return JSON.parseObject(jsonString, clazz); + } + +} diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pager.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pager.java index c2172b1..c63c02b 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pager.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pager.java @@ -1,209 +1,151 @@ -package org.tron.mongodb.util; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -public class Pager implements Serializable { - /** - * - */ - private static final long serialVersionUID = -4059909958051796316L; - public static final String CURRENT_PAGE = "curPage"; - public static final String PAGE_SIZE = "pageSize"; - - private List list = null; - - private int nextPage = 0; - - private int previousPage = 0; - - private boolean hasNext = false; - - private boolean hasPrevious = false; - - private long totalPage = 0; - - private int currentPage = 0; - - private int pageSize = 10; - - private long totalRecords = 0; - - public static void handle(HashMap hs, int currentPage, int pageSize) { - if (currentPage < 1) { - currentPage = 1; - } - int start = (currentPage - 1) * pageSize; - int offset = pageSize; - - hs.put("start", new Integer(start)); - hs.put("offset", new Integer(offset)); - } - - public Pager() { - list = new ArrayList(); - } - - public Pager(T t) { - list = new ArrayList(); - list.add(t); - this.currentPage = 1; - this.totalRecords = list.size(); - this.pageSize = 10; - if (currentPage < 1) - currentPage = 1; - if ((totalRecords / pageSize) * pageSize < totalRecords) { - setTotalPage(totalRecords / pageSize + 1); - } else { - setTotalPage(totalRecords / pageSize); - } - if (currentPage > this.getTotalPage()) { - this.setCurrentPage(1); - } - - if (currentPage == 1) { - setHasPrevious(false); - } else { - setHasPrevious(true); - setPreviousPage(currentPage - 1); - } - - if (currentPage * pageSize < totalRecords) { - setHasNext(true); - setNextPage(currentPage + 1); - } else { - setHasNext(false); - } - } - - public Pager(List list) { - this(list, list.size(), 1, 10); - } - - public Pager(List list, long totalRecords, int currentPage) { - this(list, totalRecords, currentPage, 10); - } - - public Pager(List list, long totalRecords, int currentPage, int pageSize) { - this.list = list; - this.currentPage = currentPage; - this.totalRecords = totalRecords; - this.pageSize = pageSize; - if (currentPage < 1) - currentPage = 1; - if ((totalRecords / pageSize) * pageSize < totalRecords) { - setTotalPage(totalRecords / pageSize + 1); - } else { - setTotalPage(totalRecords / pageSize); - } - if (currentPage > this.getTotalPage()) { - this.setCurrentPage(1); - } - - if (currentPage == 1) { - setHasPrevious(false); - } else { - setHasPrevious(true); - setPreviousPage(currentPage - 1); - } - - if (currentPage * pageSize < totalRecords) { - setHasNext(true); - setNextPage(currentPage + 1); - } else { - setHasNext(false); - } - } - - public int getCurrentPage() { - return currentPage; - } - - public void setCurrentPage(int currentPage) { - this.currentPage = currentPage; - } - - public boolean isHasNext() { - return hasNext; - } - - public void setHasNext(boolean hasNext) { - this.hasNext = hasNext; - } - - public boolean isHasPrevious() { - return hasPrevious; - } - - public void setHasPrevious(boolean hasPrevious) { - this.hasPrevious = hasPrevious; - } - - public List getList() { - return list; - } - - public void setList(List list) { - this.list = list; - } - - public int getNextPage() { - return nextPage; - } - - public void setNextPage(int nextPage) { - this.nextPage = nextPage; - } - - public int getPageSize() { - return pageSize; - } - - public void setPageSize(int pageSize) { - this.pageSize = pageSize; - } - - public int getPreviousPage() { - return previousPage; - } - - public void setPreviousPage(int previousPage) { - this.previousPage = previousPage; - } - - public long getTotalPage() { - return totalPage; - } - - public void setTotalPage(long totalPage) { - this.totalPage = totalPage; - } - - public long getTotalRecords() { - return totalRecords; - } - - public void setTotalRecords(long totalRecords) { - if ((totalRecords / pageSize) * pageSize < totalRecords) { - setTotalPage(totalRecords / pageSize + 1); - } else { - setTotalPage(totalRecords / pageSize); - } - this.totalRecords = totalRecords; - } - - public Pagination getPagination() { - Pagination pagination = new Pagination(); - pagination.setTotalRecords(String.valueOf(this.getTotalRecords())); - pagination.setCurPage(String.valueOf(this.getCurrentPage())); - pagination.setTotalPage(String.valueOf(this.getTotalPage())); - pagination.setPrePage(String.valueOf(this.getPreviousPage())); - pagination.setNextPage(String.valueOf(this.getNextPage())); - pagination.setNextPage(String.valueOf(this.getNextPage())); - pagination.setLastPage(String.valueOf(this.getTotalPage())); - return pagination; - } - -} +package org.tron.mongodb.util; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import lombok.Getter; +import lombok.Setter; + +@Getter +public class Pager implements Serializable { + + private static final long serialVersionUID = -4059909958051796316L; + public static final String CURRENT_PAGE = "curPage"; + public static final String PAGE_SIZE = "pageSize"; + + @Setter + private List list; + + @Setter + private int nextPage = 0; + + @Setter + private int previousPage = 0; + + @Setter + private boolean hasNext = false; + + @Setter + private boolean hasPrevious = false; + + @Setter + private long totalPage = 0; + + @Setter + private int currentPage = 0; + + @Setter + private int pageSize = 10; + + private long totalRecords = 0; + + public static void handle(HashMap hs, int currentPage, int pageSize) { + if (currentPage < 1) { + currentPage = 1; + } + int start = (currentPage - 1) * pageSize; + + hs.put("start", start); + hs.put("offset", pageSize); + } + + public Pager() { + list = new ArrayList(); + } + + public Pager(T t) { + list = new ArrayList(); + list.add(t); + this.currentPage = 1; + this.totalRecords = list.size(); + this.pageSize = 10; + if (currentPage < 1) { + currentPage = 1; + } + if ((totalRecords / pageSize) * pageSize < totalRecords) { + setTotalPage(totalRecords / pageSize + 1); + } else { + setTotalPage(totalRecords / pageSize); + } + if (currentPage > this.getTotalPage()) { + this.setCurrentPage(1); + } + + if (currentPage == 1) { + setHasPrevious(false); + } else { + setHasPrevious(true); + setPreviousPage(currentPage - 1); + } + + if (currentPage * pageSize < totalRecords) { + setHasNext(true); + setNextPage(currentPage + 1); + } else { + setHasNext(false); + } + } + + public Pager(List list) { + this(list, list.size(), 1, 10); + } + + public Pager(List list, long totalRecords, int currentPage) { + this(list, totalRecords, currentPage, 10); + } + + public Pager(List list, long totalRecords, int currentPage, int pageSize) { + this.list = list; + this.currentPage = currentPage; + this.totalRecords = totalRecords; + this.pageSize = pageSize; + if (currentPage < 1) { + currentPage = 1; + } + if ((totalRecords / pageSize) * pageSize < totalRecords) { + setTotalPage(totalRecords / pageSize + 1); + } else { + setTotalPage(totalRecords / pageSize); + } + if (currentPage > this.getTotalPage()) { + this.setCurrentPage(1); + } + + if (currentPage == 1) { + setHasPrevious(false); + } else { + setHasPrevious(true); + setPreviousPage(currentPage - 1); + } + + if (currentPage * pageSize < totalRecords) { + setHasNext(true); + setNextPage(currentPage + 1); + } else { + setHasNext(false); + } + } + + public void setTotalRecords(long totalRecords) { + if ((totalRecords / pageSize) * pageSize < totalRecords) { + setTotalPage(totalRecords / pageSize + 1); + } else { + setTotalPage(totalRecords / pageSize); + } + this.totalRecords = totalRecords; + } + + public Pagination getPagination() { + Pagination pagination = new Pagination(); + pagination.setTotalRecords(String.valueOf(this.getTotalRecords())); + pagination.setCurPage(String.valueOf(this.getCurrentPage())); + pagination.setTotalPage(String.valueOf(this.getTotalPage())); + pagination.setPrePage(String.valueOf(this.getPreviousPage())); + pagination.setNextPage(String.valueOf(this.getNextPage())); + pagination.setNextPage(String.valueOf(this.getNextPage())); + pagination.setLastPage(String.valueOf(this.getTotalPage())); + return pagination; + } + +} diff --git a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pagination.java b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pagination.java index 2ab7ced..604a0aa 100644 --- a/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pagination.java +++ b/plugins/mongodbplugin/src/main/java/org/tron/mongodb/util/Pagination.java @@ -1,68 +1,17 @@ -package org.tron.mongodb.util; - -public class Pagination { - private String curPage; - private String nextPage; - private String prePage; - private String lastPage; - private String firstPage = "1"; - private String totalPage; - private String totalRecords; - - public String getCurPage() { - return curPage; - } - - public void setCurPage(String curPage) { - this.curPage = curPage; - } - - public String getNextPage() { - return nextPage; - } - - public void setNextPage(String nextPage) { - this.nextPage = nextPage; - } - - public String getPrePage() { - return prePage; - } - - public void setPrePage(String prePage) { - this.prePage = prePage; - } - - public String getLastPage() { - return lastPage; - } - - public void setLastPage(String lastPage) { - this.lastPage = lastPage; - } - - public String getFirstPage() { - return firstPage; - } - - public void setFirstPage(String firstPage) { - this.firstPage = firstPage; - } - - public String getTotalPage() { - return totalPage; - } - - public void setTotalPage(String totalPage) { - this.totalPage = totalPage; - } - - public String getTotalRecords() { - return totalRecords; - } - - public void setTotalRecords(String totalRecords) { - this.totalRecords = totalRecords; - } - -} +package org.tron.mongodb.util; + +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +public class Pagination { + + private String curPage; + private String nextPage; + private String prePage; + private String lastPage; + private String firstPage = "1"; + private String totalPage; + private String totalRecords; +}