On a project we were working on recently it appeared that we had data coming into our Extract, Transform, Load (ETL) processes which should have been filtered out. In this particular case the files which we imported only would exist at max up to 7 days and on any given day we’d have tens of thousands of files that would be created and imported. This presented a difficult problem to trace down if something inside our ETL had gone awry or if we were being fed bad data. Furthermore as the files always would be deleted after importing we didn’t keep where a data point was created from.

Instead of updating our ETL process to track where a specific piece of data originated from we wanted to basically ‘grep’ the files in S3. After looking around it doesn’t look like anyone has built a “Grep for S3”, so we built one. The reason we didn’t simply download the files locally and then process them one at a time is it’d take forever to transfer, then grep each one individual sequentially. Instead we wanted to do the search in parallel and not hold the entire files on the local disk.

With this we came up with our simple S3Grep java app (a pre-built jar is located in the releases) which will search all files in a specific bucket for a specific string. It currently supports both regex or non-regex search strings. You can specify how many threads you want it to use to process the files or it by default will try to use the same number of CPU’s on your machine. It utilizes the S3 Java adapter to read the files as a stream rather than a single transfer, than read from disk. Using the tool is very simple:

A the file is a config file where you setup what you are searching for. An example:

For the most part this is self explanatory. The log level will default to INFO, however if you specify DEBUG it will output some more information such as what file’s it is currently checking. The logger_pattern parameter defaults to “%d{dd MMM yyyy HH:mm:ss} [%p] %m%n” and can be any pattern you want. For more information on the formatting visit the PatternLayout Documentation.

The default output format would look something like this:

If you want a little less verbose and more of just log lines you can update the logger_pattern to be just %m%n and end up with something similar to:

The format of the output is FILE:LINE_NUMBER:matching_string.

Anyways hope this helps you if you are trying to hunt down what file contains a text string in your S3 buckets. Let us know if you have any questions or if we can help!

Following up on our previous post, after evaluating Flume we decided it was a good fit and chose to move forward with it. In our specific use case the data we are gathering is ephemeral so we didn’t need to enforce any deliverability or durability guarantees. For us, missing messages or double delivery is fine as long as the business logic throughput on the application side wasn’t affected. Concretely, our application is high volume, low latency HTTP message broker and we’re looking to record the request URLs via Flume into S3.

One of the compelling aspects of Flume is that it ships with several ways to ingest and syndicate your data via sources and sinks. Since we’re targeting S3 we’d settled on using the default HDFS sink but we have some options on the source. For a general case with complex events the Avro source would be the natural choice but since we’re just logging lines of text the NetCat source looked like a better fit. One of the issues we had with the NetCat source is that it’s TCP based so on the application side we’d need to implement timeouts and connection management on the application side. In addition to that, looking at the code of the NetCat source you’ll notice it’s implemented using traditional Java NIO sockets but if you check out the Avro source it’s built using Netty NIO which can leverage libevent on Linux.

Given those issues and our relaxed durability requirements we started looking at the available UDP sources. The Syslog UDP source looked the promising but it actually validates the format of the inbound messages so we wouldn’t be able to send messages with just the URLs. The code for the Syslog UDP source looked pretty straightforward so at this point we decided to build a custom source based on the existing Syslog UDP source. Our final code ended up looking like:

The big changes were in the implementation of messageReceived and the creation of the new extractEvent method. Including your new source in Flume is straightforward, you just need to build a JAR and drop that into Flume’s “lib/” folder. The easiest way to do this is with javac and jar to package it up. You’ll just need a binary copy of Flume so that you can reference its JARs. Build it with:

And then, you can test this out by creating a file named “agent1.conf” in your Flume directory containing:

Finally, you need to launch Flume by running:

ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console

And then to test it you can use “netcat” to fire off some UDP packets with:

ashish@ashish:~/Downloads$ echo "hi flume" | nc -4u -w1 localhost 44444

Which you should see come across your console that’s running Flume. Be aware, the Flume logger truncates messages so if you send a longer string you won’t see it in the logger.

And that’s it. Non-durable, UDP source built and deployed. Anyway, we’re still pretty new to Flume so any feedback or comments would be appreciated!

