From a991dcf34e4d256261ff100229e9c9dda45636b2 Mon Sep 17 00:00:00 2001 From: DALOSA01 Date: Fri, 17 Feb 2017 15:28:58 +0100 Subject: [PATCH 1/3] Removed ssl stuff. --- .../protocol/LumberjackClient.java | 465 +++++++++--------- 1 file changed, 223 insertions(+), 242 deletions(-) diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index b07a27b..b48e1dd 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -1,242 +1,223 @@ -package info.fetter.logstashforwarder.protocol; - -/* - * Copyright 2015 Didier Fetter - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -import info.fetter.logstashforwarder.Event; -import info.fetter.logstashforwarder.ProtocolAdapter; -import info.fetter.logstashforwarder.util.AdapterException; - -import java.io.BufferedOutputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ProtocolException; -import java.net.Socket; -import java.security.KeyStore; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.zip.Deflater; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManagerFactory; - -import org.apache.commons.io.HexDump; -import org.apache.log4j.Logger; - -public class LumberjackClient implements ProtocolAdapter { - private final static Logger logger = Logger.getLogger(LumberjackClient.class); - private final static byte PROTOCOL_VERSION = 0x31; - private final static byte FRAME_ACK = 0x41; - private final static byte FRAME_WINDOW_SIZE = 0x57; - private final static byte FRAME_DATA = 0x44; - private final static byte FRAME_COMPRESSED = 0x43; - - private Socket socket; - private SSLSocket sslSocket; - private KeyStore keyStore; - private String server; - private int port; - private DataOutputStream output; - private DataInputStream input; - private int sequence = 1; - - public LumberjackClient(String keyStorePath, String server, int port, int timeout) throws IOException { - this.server = server; - this.port = port; - - try { - if(keyStorePath == null) { - throw new IOException("Key store not configured"); - } - if(server == null) { - throw new IOException("Server address not configured"); - } - - keyStore = KeyStore.getInstance("JKS"); - keyStore.load(new FileInputStream(keyStorePath), null); - - TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); - tmf.init(keyStore); - - SSLContext context = SSLContext.getInstance("TLS"); - context.init(null, tmf.getTrustManagers(), null); - - SSLSocketFactory socketFactory = context.getSocketFactory(); - socket = new Socket(); - socket.connect(new InetSocketAddress(InetAddress.getByName(server), port), timeout); - socket.setSoTimeout(timeout); - sslSocket = (SSLSocket)socketFactory.createSocket(socket, server, port, true); - sslSocket.setUseClientMode(true); - sslSocket.startHandshake(); - - output = new DataOutputStream(new BufferedOutputStream(sslSocket.getOutputStream())); - input = new DataInputStream(sslSocket.getInputStream()); - - logger.info("Connected to " + server + ":" + port); - } catch(IOException e) { - throw e; - } catch(Exception e) { - throw new RuntimeException(e); - } - } - - public int sendWindowSizeFrame(int size) throws IOException { - output.writeByte(PROTOCOL_VERSION); - output.writeByte(FRAME_WINDOW_SIZE); - output.writeInt(size); - output.flush(); - if(logger.isDebugEnabled()) { - logger.debug("Sending window size frame : " + size + " frames"); - } - return 6; - } - - private int sendDataFrame(DataOutputStream output, Map keyValues) throws IOException { - output.writeByte(PROTOCOL_VERSION); - output.writeByte(FRAME_DATA); - output.writeInt(sequence++); - output.writeInt(keyValues.size()); - int bytesSent = 10; - for(String key : keyValues.keySet()) { - int keyLength = key.length(); - output.writeInt(keyLength); - bytesSent += 4; - output.write(key.getBytes()); - bytesSent += keyLength; - byte[] value = keyValues.get(key); - output.writeInt(value.length); - bytesSent += 4; - output.write(value); - bytesSent += value.length; - } - output.flush(); - return bytesSent; - } - - public int sendDataFrameInSocket(Map keyValues) throws IOException { - return sendDataFrame(output, keyValues); - } - - public int sendCompressedFrame(List> keyValuesList) throws IOException { - output.writeByte(PROTOCOL_VERSION); - output.writeByte(FRAME_COMPRESSED); - - ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); - DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); - for(Map keyValues : keyValuesList) { - logger.trace("Adding data frame"); - sendDataFrame(uncompressedOutput, keyValues); - } - uncompressedOutput.close(); - Deflater compressor = new Deflater(); - byte[] uncompressedData = uncompressedBytes.toByteArray(); - if(logger.isDebugEnabled()) { - logger.debug("Deflating data : " + uncompressedData.length + " bytes"); - } - if(logger.isTraceEnabled()) { - HexDump.dump(uncompressedData, 0, System.out, 0); - } - compressor.setInput(uncompressedData); - compressor.finish(); - - ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - while(!compressor.finished()) { - int count = compressor.deflate(buffer); - compressedBytes.write(buffer, 0, count); - } - compressedBytes.close(); - byte[] compressedData = compressedBytes.toByteArray(); - if(logger.isDebugEnabled()) { - logger.debug("Deflated data : " + compressor.getTotalOut() + " bytes"); - } - if(logger.isTraceEnabled()) { - HexDump.dump(compressedData, 0, System.out, 0); - } - - output.writeInt(compressor.getTotalOut()); - output.write(compressedData); - output.flush(); - - if(logger.isDebugEnabled()) { - logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames"); - } - return 6 + compressor.getTotalOut(); - } - - public int readAckFrame() throws ProtocolException, IOException { - byte protocolVersion = input.readByte(); - if(protocolVersion != PROTOCOL_VERSION) { - throw new ProtocolException("Protocol version should be 1, received " + protocolVersion); - } - byte frameType = input.readByte(); - if(frameType != FRAME_ACK) { - throw new ProtocolException("Frame type should be Ack, received " + frameType); - } - int sequenceNumber = input.readInt(); - if(logger.isDebugEnabled()) { - logger.debug("Received ack sequence : " + sequenceNumber); - } - return sequenceNumber; - } - - public int sendEvents(List eventList) throws AdapterException { - try { - int beginSequence = sequence; - int numberOfEvents = eventList.size(); - if(logger.isInfoEnabled()) { - logger.info("Sending " + numberOfEvents + " events"); - } - sendWindowSizeFrame(numberOfEvents); - List> keyValuesList = new ArrayList>(numberOfEvents); - for(Event event : eventList) { - keyValuesList.add(event.getKeyValues()); - } - sendCompressedFrame(keyValuesList); - while(readAckFrame() < (sequence - 1) ) {} - return sequence - beginSequence; - } catch(Exception e) { - throw new AdapterException(e); - } - } - - public void close() throws AdapterException { - try { - sslSocket.close(); - } catch(Exception e) { - throw new AdapterException(e); - } - logger.info("Connection to " + server + ":" + port + " closed"); - } - - public String getServer() { - return server; - } - - public int getPort() { - return port; - } - -} +package info.fetter.logstashforwarder.protocol; + +/* + * Copyright 2015 Didier Fetter + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import info.fetter.logstashforwarder.Event; +import info.fetter.logstashforwarder.ProtocolAdapter; +import info.fetter.logstashforwarder.util.AdapterException; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ProtocolException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.zip.Deflater; + +import org.apache.commons.io.HexDump; +import org.apache.log4j.Logger; + +public class LumberjackClient implements ProtocolAdapter { + private final static Logger logger = Logger.getLogger(LumberjackClient.class); + private final static byte PROTOCOL_VERSION = 0x31; + private final static byte FRAME_ACK = 0x41; + private final static byte FRAME_WINDOW_SIZE = 0x57; + private final static byte FRAME_DATA = 0x44; + private final static byte FRAME_COMPRESSED = 0x43; + + private Socket socket; + + + private String server; + private int port; + private DataOutputStream output; + private DataInputStream input; + private int sequence = 1; + + public LumberjackClient(String keyStorePath, String server, int port, int timeout) throws IOException { + this.server = server; + this.port = port; + + try { + + if(server == null) { + throw new IOException("Server address not configured"); + } + + + + socket = new Socket(); + socket.connect(new InetSocketAddress(InetAddress.getByName(server), port), timeout); + socket.setSoTimeout(timeout); + + output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + input = new DataInputStream(socket.getInputStream()); + + logger.info("Connected to " + server + ":" + port); + + } catch(IOException e) { + throw e; + } catch(Exception e) { + throw new RuntimeException(e); + } + } + + public int sendWindowSizeFrame(int size) throws IOException { + output.writeByte(PROTOCOL_VERSION); + output.writeByte(FRAME_WINDOW_SIZE); + output.writeInt(size); + output.flush(); + if(logger.isDebugEnabled()) { + logger.debug("Sending window size frame : " + size + " frames"); + } + return 6; + } + + private int sendDataFrame(DataOutputStream output, Map keyValues) throws IOException { + output.writeByte(PROTOCOL_VERSION); + output.writeByte(FRAME_DATA); + output.writeInt(sequence++); + output.writeInt(keyValues.size()); + int bytesSent = 10; + for(String key : keyValues.keySet()) { + int keyLength = key.length(); + output.writeInt(keyLength); + bytesSent += 4; + output.write(key.getBytes()); + bytesSent += keyLength; + byte[] value = keyValues.get(key); + output.writeInt(value.length); + bytesSent += 4; + output.write(value); + bytesSent += value.length; + } + output.flush(); + return bytesSent; + } + + public int sendDataFrameInSocket(Map keyValues) throws IOException { + return sendDataFrame(output, keyValues); + } + + public int sendCompressedFrame(List> keyValuesList) throws IOException { + output.writeByte(PROTOCOL_VERSION); + output.writeByte(FRAME_COMPRESSED); + + ByteArrayOutputStream uncompressedBytes = new ByteArrayOutputStream(); + DataOutputStream uncompressedOutput = new DataOutputStream(uncompressedBytes); + for(Map keyValues : keyValuesList) { + logger.trace("Adding data frame"); + sendDataFrame(uncompressedOutput, keyValues); + } + uncompressedOutput.close(); + Deflater compressor = new Deflater(); + byte[] uncompressedData = uncompressedBytes.toByteArray(); + if(logger.isDebugEnabled()) { + logger.debug("Deflating data : " + uncompressedData.length + " bytes"); + } + if(logger.isTraceEnabled()) { + HexDump.dump(uncompressedData, 0, System.out, 0); + } + compressor.setInput(uncompressedData); + compressor.finish(); + + ByteArrayOutputStream compressedBytes = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + while(!compressor.finished()) { + int count = compressor.deflate(buffer); + compressedBytes.write(buffer, 0, count); + } + compressedBytes.close(); + byte[] compressedData = compressedBytes.toByteArray(); + if(logger.isDebugEnabled()) { + logger.debug("Deflated data : " + compressor.getTotalOut() + " bytes"); + } + if(logger.isTraceEnabled()) { + HexDump.dump(compressedData, 0, System.out, 0); + } + + output.writeInt(compressor.getTotalOut()); + output.write(compressedData); + output.flush(); + + if(logger.isDebugEnabled()) { + logger.debug("Sending compressed frame : " + keyValuesList.size() + " frames"); + } + return 6 + compressor.getTotalOut(); + } + + public int readAckFrame() throws ProtocolException, IOException { + byte protocolVersion = input.readByte(); + if(protocolVersion != PROTOCOL_VERSION) { + throw new ProtocolException("Protocol version should be 1, received " + protocolVersion); + } + byte frameType = input.readByte(); + if(frameType != FRAME_ACK) { + throw new ProtocolException("Frame type should be Ack, received " + frameType); + } + int sequenceNumber = input.readInt(); + if(logger.isDebugEnabled()) { + logger.debug("Received ack sequence : " + sequenceNumber); + } + return sequenceNumber; + } + + public int sendEvents(List eventList) throws AdapterException { + try { + int beginSequence = sequence; + int numberOfEvents = eventList.size(); + if(logger.isInfoEnabled()) { + logger.info("Sending " + numberOfEvents + " events"); + } + sendWindowSizeFrame(numberOfEvents); + List> keyValuesList = new ArrayList>(numberOfEvents); + for(Event event : eventList) { + keyValuesList.add(event.getKeyValues()); + } + sendCompressedFrame(keyValuesList); + while(readAckFrame() < (sequence - 1) ) {} + return sequence - beginSequence; + } catch(Exception e) { + throw new AdapterException(e); + } + } + + public void close() throws AdapterException { + try { + socket.close(); + } catch(Exception e) { + throw new AdapterException(e); + } + logger.info("Connection to " + server + ":" + port + " closed"); + } + + public String getServer() { + return server; + } + + public int getPort() { + return port; + } + +} From 3383ff76e9981cc80142e3fea3b6d215c54f5207 Mon Sep 17 00:00:00 2001 From: DALOSA01 Date: Fri, 17 Feb 2017 15:40:53 +0100 Subject: [PATCH 2/3] Removed keystorepath check, in order to run logstash-forwarder client in ssl mode or not Here in my company, we work with some solaris machines, and it's impossible to make filebeat works and even worse, to do it with ssl stuff. So... I have added de possibility to work in both modes :) --- pom.xml | 2 +- .../protocol/LumberjackClient.java | 73 ++++++++++++++----- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/pom.xml b/pom.xml index 7809709..cc2bbe9 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 logstash-forwarder-java logstash-forwarder-java - 0.2.4 + 0.2.5-SNAPSHOT logstash-forwarder-java Java version of logstash forwarder https://github.com/didfet/logstash-forwarder-java diff --git a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java index b48e1dd..5bac0fa 100644 --- a/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java +++ b/src/main/java/info/fetter/logstashforwarder/protocol/LumberjackClient.java @@ -25,17 +25,32 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ProtocolException; import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.zip.Deflater; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManagerFactory; + import org.apache.commons.io.HexDump; +import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; public class LumberjackClient implements ProtocolAdapter { @@ -47,8 +62,8 @@ public class LumberjackClient implements ProtocolAdapter { private final static byte FRAME_COMPRESSED = 0x43; private Socket socket; - - + private SSLSocket sslSocket; + private KeyStore keyStore; private String server; private int port; private DataOutputStream output; @@ -60,19 +75,21 @@ public LumberjackClient(String keyStorePath, String server, int port, int timeou this.port = port; try { - + //if keystorepath is null, behaviour is modified, in order to run in mode no_ssl if(server == null) { throw new IOException("Server address not configured"); + } + generatePlainSocket(server, port, timeout); + if (keyStorePath!=null){ + generateSSLSocket(keyStorePath, server, port); + output = new DataOutputStream(new BufferedOutputStream(sslSocket.getOutputStream())); + input = new DataInputStream(sslSocket.getInputStream()); + } - - - - socket = new Socket(); - socket.connect(new InetSocketAddress(InetAddress.getByName(server), port), timeout); - socket.setSoTimeout(timeout); - - output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); - input = new DataInputStream(socket.getInputStream()); + else{ + output = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); + input = new DataInputStream(socket.getInputStream()); + } logger.info("Connected to " + server + ":" + port); @@ -83,6 +100,31 @@ public LumberjackClient(String keyStorePath, String server, int port, int timeou } } + private void generateSSLSocket(String keyStorePath, String server, int port) + throws KeyStoreException, IOException, NoSuchAlgorithmException, + CertificateException, FileNotFoundException, KeyManagementException { + keyStore = KeyStore.getInstance("JKS"); + keyStore.load(new FileInputStream(keyStorePath), null); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX"); + tmf.init(keyStore); + + SSLContext context = SSLContext.getInstance("TLS"); + context.init(null, tmf.getTrustManagers(), null); + + SSLSocketFactory socketFactory = context.getSocketFactory(); + sslSocket = (SSLSocket)socketFactory.createSocket(socket, server, port, true); + sslSocket.setUseClientMode(true); + sslSocket.startHandshake(); + } + + private void generatePlainSocket(String server, int port, int timeout) + throws IOException, UnknownHostException, SocketException { + socket = new Socket(); + socket.connect(new InetSocketAddress(InetAddress.getByName(server), port), timeout); + socket.setSoTimeout(timeout); + } + public int sendWindowSizeFrame(int size) throws IOException { output.writeByte(PROTOCOL_VERSION); output.writeByte(FRAME_WINDOW_SIZE); @@ -204,11 +246,8 @@ public int sendEvents(List eventList) throws AdapterException { } public void close() throws AdapterException { - try { - socket.close(); - } catch(Exception e) { - throw new AdapterException(e); - } + IOUtils.closeQuietly(socket); + IOUtils.closeQuietly(sslSocket); logger.info("Connection to " + server + ":" + port + " closed"); } From 85640d62b626d833b712a562003b8cdbfa774fbd Mon Sep 17 00:00:00 2001 From: DALOSA01 Date: Fri, 17 Feb 2017 15:42:53 +0100 Subject: [PATCH 3/3] Readme updated --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 1b4ef8e..9d62fd8 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,8 @@ The configuration file is the same (json format), but there are a few difference - the ssl ca parameter points to a java [keystore](https://github.com/didfet/logstash-forwarder-java/blob/master/HOWTO-KEYSTORE.md) containing the root certificate of the server, not a PEM file - comments are C-style comments +It's also possible to work in ssl disabled mode + ### Command-line options Some options are the same :