Following up on our previous post, after evaluating Flume we decided it was a good fit and chose to move forward with it. In our specific use case the data we are gathering is ephemeral so we didn’t need to enforce any deliverability or durability guarantees. For us, missing messages or double delivery is fine as long as the business logic throughput on the application side wasn’t affected. Concretely, our application is high volume, low latency HTTP message broker and we’re looking to record the request URLs via Flume into S3.
One of the compelling aspects of Flume is that it ships with several ways to ingest and syndicate your data via sources and sinks. Since we’re targeting S3 we’d settled on using the default HDFS sink but we have some options on the source. For a general case with complex events the Avro source would be the natural choice but since we’re just logging lines of text the NetCat source looked like a better fit. One of the issues we had with the NetCat source is that it’s TCP based so on the application side we’d need to implement timeouts and connection management on the application side. In addition to that, looking at the code of the NetCat source you’ll notice it’s implemented using traditional Java NIO sockets but if you check out the Avro source it’s built using Netty NIO which can leverage libevent on Linux.
Given those issues and our relaxed durability requirements we started looking at the available UDP sources. The Syslog UDP source looked the promising but it actually validates the format of the inbound messages so we wouldn’t be able to send messages with just the URLs. The code for the Syslog UDP source looked pretty straightforward so at this point we decided to build a custom source based on the existing Syslog UDP source. Our final code ended up looking like:
package com.setfive.flume; | |
import java.io.ByteArrayOutputStream; | |
import java.net.InetSocketAddress; | |
import java.net.SocketAddress; | |
import java.nio.charset.Charset; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.annotations.VisibleForTesting; | |
import org.apache.flume.ChannelException; | |
import org.apache.flume.Context; | |
import org.apache.flume.CounterGroup; | |
import org.apache.flume.Event; | |
import org.apache.flume.EventDrivenSource; | |
import org.apache.flume.conf.Configurable; | |
import org.apache.flume.conf.Configurables; | |
import org.apache.flume.event.EventBuilder; | |
import org.apache.flume.source.AbstractSource; | |
import org.apache.flume.source.SyslogSourceConfigurationConstants; | |
import org.jboss.netty.bootstrap.ConnectionlessBootstrap; | |
import org.jboss.netty.buffer.ChannelBuffer; | |
import org.jboss.netty.channel.*; | |
import org.jboss.netty.channel.socket.oio.OioDatagramChannelFactory; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class UDPTextSource extends AbstractSource implements EventDrivenSource, Configurable { | |
private int port; | |
private int maxsize = 1 << 16; // 64k is max allowable in RFC 5426 | |
private String host = null; | |
private Channel nettyChannel; | |
private SimpleDateFormat dateFormatter = new SimpleDateFormat("yyy-MM-dd HH:mm:ss"); | |
public static final int DEFAULT_MIN_SIZE = 4096; | |
public static final int DEFAULT_INITIAL_SIZE = DEFAULT_MIN_SIZE; | |
private static final Logger logger = LoggerFactory.getLogger(UDPTextSource.class); | |
private CounterGroup counterGroup = new CounterGroup(); | |
public class syslogHandler extends SimpleChannelHandler { | |
int maxsize = 1 << 16; | |
boolean doneReading = false; | |
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxsize); | |
private void resetReader(){ | |
doneReading = false; | |
baos.reset(); | |
} | |
private Event extractEvent(ChannelBuffer in){ | |
byte b = 0; | |
Event e = null; | |
try { | |
while (!doneReading && in.readable()) { | |
b = in.readByte(); | |
if(b == '\n'){ | |
doneReading = true; | |
continue; | |
} | |
baos.write(b); | |
} | |
}catch(Exception ex){ | |
resetReader(); | |
} | |
if(doneReading){ | |
String str = String.join(",", baos.toString(), dateFormatter.format(new Date())); | |
e = EventBuilder.withBody(str, Charset.defaultCharset()); | |
resetReader(); | |
} | |
return e; | |
} | |
@Override | |
public void messageReceived(ChannelHandlerContext ctx, MessageEvent mEvent) { | |
try{ | |
ChannelBuffer cb = (ChannelBuffer) mEvent.getMessage(); | |
Event e = extractEvent(cb); | |
if(e == null){ | |
return; | |
} | |
getChannelProcessor().processEvent(e); | |
}catch(ChannelException ex) { | |
counterGroup.incrementAndGet("events.dropped"); | |
logger.error("Error writting to channel", ex); | |
return; | |
} | |
} | |
} | |
@Override | |
public void start() { | |
// setup Netty server | |
ConnectionlessBootstrap serverBootstrap = new ConnectionlessBootstrap(new OioDatagramChannelFactory(Executors.newCachedThreadPool())); | |
final syslogHandler handler = new syslogHandler(); | |
serverBootstrap.setOption("receiveBufferSizePredictorFactory", | |
new AdaptiveReceiveBufferSizePredictorFactory(DEFAULT_MIN_SIZE, DEFAULT_INITIAL_SIZE, maxsize)); | |
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { | |
@Override | |
public ChannelPipeline getPipeline() { | |
return Channels.pipeline(handler); | |
} | |
}); | |
if (host == null) { | |
nettyChannel = serverBootstrap.bind(new InetSocketAddress(port)); | |
} else { | |
nettyChannel = serverBootstrap.bind(new InetSocketAddress(host, port)); | |
} | |
super.start(); | |
} | |
@Override | |
public void stop() { | |
logger.info("Syslog UDP Source stopping..."); | |
logger.info("Metrics:{}", counterGroup); | |
if (nettyChannel != null) { | |
nettyChannel.close(); | |
try { | |
nettyChannel.getCloseFuture().await(60, TimeUnit.SECONDS); | |
} catch (InterruptedException e) { | |
logger.warn("netty server stop interrupted", e); | |
} finally { | |
nettyChannel = null; | |
} | |
} | |
super.stop(); | |
} | |
@Override | |
public void configure(Context context) { | |
Configurables.ensureRequiredNonNull(context, SyslogSourceConfigurationConstants.CONFIG_PORT); | |
port = context.getInteger(SyslogSourceConfigurationConstants.CONFIG_PORT); | |
host = context.getString(SyslogSourceConfigurationConstants.CONFIG_HOST); | |
} | |
@VisibleForTesting | |
public int getSourcePort() { | |
SocketAddress localAddress = nettyChannel.getLocalAddress(); | |
if (localAddress instanceof InetSocketAddress) { | |
InetSocketAddress addr = (InetSocketAddress) localAddress; | |
return addr.getPort(); | |
} | |
return 0; | |
} | |
} |
The big changes were in the implementation of messageReceived and the creation of the new extractEvent method. Including your new source in Flume is straightforward, you just need to build a JAR and drop that into Flume’s “lib/” folder. The easiest way to do this is with javac and jar to package it up. You’ll just need a binary copy of Flume so that you can reference its JARs. Build it with:
ashish@ashish:~/Downloads$ wget http://mirrors.gigenet.com/apache/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz | |
ashish@ashish:~/Downloads$ tar -zxvf apache-flume-1.6.0-bin.tar.gz | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ wget https://gist.githubusercontent.com/adatta02/3168444ea76d2f734859978a4b6303b2/raw/a98d78f22a1822cd05ad40c2f5e5d589641f0073/UDPTextSource.java | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ mkdir udpsource | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ javac -d udpsource/ -cp "lib/*" UDPTextSource.java | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ cd udpsource/ | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin/udpsource$ jar cvf UdpTextSource.jar . | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin/udpsource$ cd ../ | |
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ mv udpsource/UdpTextSource.jar lib/ |
And then, you can test this out by creating a file named “agent1.conf” in your Flume directory containing:
# Name the components on this agent | |
a1.sources = r1 | |
a1.sinks = k1 | |
a1.channels = c1 | |
# Describe/configure the source | |
a1.sources.r1.type = com.setfive.flume.UDPTextSource | |
a1.sources.r1.bind = 0.0.0.0 | |
a1.sources.r1.port = 44444 | |
# Describe the sink | |
a1.sinks.k1.type = logger | |
# Use a channel which buffers events in memory | |
a1.channels.c1.type = memory | |
a1.channels.c1.capacity = 1000 | |
a1.channels.c1.transactionCapacity = 100 | |
# Bind the source and sink to the channel | |
a1.sources.r1.channels = c1 | |
a1.sinks.k1.channel = c1 |
Finally, you need to launch Flume by running:
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console
And then to test it you can use “netcat” to fire off some UDP packets with:
ashish@ashish:~/Downloads$ echo "hi flume" | nc -4u -w1 localhost 44444
Which you should see come across your console that’s running Flume. Be aware, the Flume logger truncates messages so if you send a longer string you won’t see it in the logger.
And that’s it. Non-durable, UDP source built and deployed. Anyway, we’re still pretty new to Flume so any feedback or comments would be appreciated!