1111import io .netty .handler .codec .http .HttpHeaders ;
1212import io .netty .handler .codec .http .HttpResponse ;
1313import io .netty .handler .codec .http .HttpResponseStatus ;
14+ import org .apache .logging .log4j .LogManager ;
15+ import org .apache .logging .log4j .Logger ;
1416import org .logstash .plugins .inputs .http .util .RejectableRunnable ;
1517
1618import java .nio .charset .Charset ;
@@ -23,7 +25,9 @@ public class MessageProcessor implements RejectableRunnable {
2325 private final String remoteAddress ;
2426 private final IMessageHandler messageHandler ;
2527 private final HttpResponseStatus responseStatus ;
26- private static final Charset charset = Charset .forName ("UTF-8" );
28+
29+ private static final Charset UTF8_CHARSET = Charset .forName ("UTF-8" );
30+ private final static Logger LOGGER = LogManager .getLogger (MessageHandler .class );
2731
2832 MessageProcessor (ChannelHandlerContext ctx , FullHttpRequest req , String remoteAddress ,
2933 IMessageHandler messageHandler , HttpResponseStatus responseStatus ) {
@@ -47,12 +51,19 @@ public void onRejection() {
4751 public void run () {
4852 try {
4953 final HttpResponse response ;
50- final String token = req .headers ().get (HttpHeaderNames .AUTHORIZATION );
51- req .headers ().remove (HttpHeaderNames .AUTHORIZATION );
52- if (messageHandler .validatesToken (token )) {
53- response = processMessage ();
54+ if (messageHandler .requiresToken () && !req .headers ().contains (HttpHeaderNames .AUTHORIZATION )) {
55+ LOGGER .debug ("Required authorization not provided; requesting authentication." );
56+ response = generateAuthenticationRequestResponse ();
5457 } else {
55- response = generateFailedResponse (HttpResponseStatus .UNAUTHORIZED );
58+ final String token = req .headers ().get (HttpHeaderNames .AUTHORIZATION );
59+ req .headers ().remove (HttpHeaderNames .AUTHORIZATION );
60+ if (messageHandler .validatesToken (token )) {
61+ LOGGER .debug ("Valid authorization; processing request." );
62+ response = processMessage ();
63+ } else {
64+ LOGGER .debug ("Invalid authorization; rejecting request." );
65+ response = generateFailedResponse (HttpResponseStatus .UNAUTHORIZED );
66+ }
5667 }
5768 ctx .writeAndFlush (response );
5869 } finally {
@@ -62,7 +73,7 @@ public void run() {
6273
6374 private FullHttpResponse processMessage () {
6475 final Map <String , String > formattedHeaders = formatHeaders (req .headers ());
65- final String body = req .content ().toString (charset );
76+ final String body = req .content ().toString (UTF8_CHARSET );
6677 if (messageHandler .onNewMessage (remoteAddress , formattedHeaders , body )) {
6778 return generateResponse (messageHandler .responseHeaders ());
6879 } else {
@@ -76,6 +87,13 @@ private FullHttpResponse generateFailedResponse(HttpResponseStatus status) {
7687 return response ;
7788 }
7889
90+ private FullHttpResponse generateAuthenticationRequestResponse () {
91+ final FullHttpResponse response = new DefaultFullHttpResponse (req .protocolVersion (), HttpResponseStatus .UNAUTHORIZED );
92+ response .headers ().set (HttpHeaderNames .WWW_AUTHENTICATE , "Basic realm=\" Logstash HTTP Input\" " );
93+ response .headers ().set (HttpHeaderNames .CONTENT_LENGTH , 0 );
94+ return response ;
95+ }
96+
7997 private FullHttpResponse generateResponse (Map <String , String > stringHeaders ) {
8098
8199 final FullHttpResponse response = new DefaultFullHttpResponse (
@@ -88,7 +106,7 @@ private FullHttpResponse generateResponse(Map<String, String> stringHeaders) {
88106 response .headers ().set (headers );
89107
90108 if (responseStatus != HttpResponseStatus .NO_CONTENT ) {
91- final ByteBuf payload = Unpooled .wrappedBuffer ("ok" .getBytes (charset ));
109+ final ByteBuf payload = Unpooled .wrappedBuffer ("ok" .getBytes (UTF8_CHARSET ));
92110 response .headers ().set (HttpHeaderNames .CONTENT_LENGTH , payload .readableBytes ());
93111 response .headers ().set (HttpHeaderNames .CONTENT_TYPE , "text/plain" );
94112 response .content ().writeBytes (payload );
0 commit comments