S3Grep – Searching S3 Files and Buckets

On a project we were working on recently it appeared that we had data coming into our Extract, Transform, Load (ETL) processes which should have been filtered out. In this particular case the files which we imported only would exist at max up to 7 days and on any given day we’d have tens of thousands of files that would be created and imported. This presented a difficult problem to trace down if something inside our ETL had gone awry or if we were being fed bad data. Furthermore as the files always would be deleted after importing we didn’t keep where a data point was created from.

Instead of updating our ETL process to track where a specific piece of data originated from we wanted to basically ‘grep’ the files in S3. After looking around it doesn’t look like anyone has built a “Grep for S3”, so we built one. The reason we didn’t simply download the files locally and then process them one at a time is it’d take forever to transfer, then grep each one individual sequentially. Instead we wanted to do the search in parallel and not hold the entire files on the local disk.

With this we came up with our simple S3Grep java app (a pre-built jar is located in the releases) which will search all files in a specific bucket for a specific string. It currently supports both regex or non-regex search strings. You can specify how many threads you want it to use to process the files or it by default will try to use the same number of CPU’s on your machine. It utilizes the S3 Java adapter to read the files as a stream rather than a single transfer, than read from disk. Using the tool is very simple:

A the s3grep.properties file is a config file where you setup what you are searching for. An example:

For the most part this is self explanatory. The log level will default to INFO, however if you specify DEBUG it will output some more information such as what file’s it is currently checking. The logger_pattern parameter defaults to “%d{dd MMM yyyy HH:mm:ss} [%p] %m%n” and can be any pattern you want. For more information on the formatting visit the PatternLayout Documentation.

The default output format would look something like this:

If you want a little less verbose and more of just log lines you can update the logger_pattern to be just %m%n and end up with something similar to:

The format of the output is FILE:LINE_NUMBER:matching_string.

Anyways hope this helps you if you are trying to hunt down what file contains a text string in your S3 buckets. Let us know if you have any questions or if we can help!

Apache Flume: Building a custom UDP source

Following up on our previous post, after evaluating Flume we decided it was a good fit and chose to move forward with it. In our specific use case the data we are gathering is ephemeral so we didn’t need to enforce any deliverability or durability guarantees. For us, missing messages or double delivery is fine as long as the business logic throughput on the application side wasn’t affected. Concretely, our application is high volume, low latency HTTP message broker and we’re looking to record the request URLs via Flume into S3.

One of the compelling aspects of Flume is that it ships with several ways to ingest and syndicate your data via sources and sinks. Since we’re targeting S3 we’d settled on using the default HDFS sink but we have some options on the source. For a general case with complex events the Avro source would be the natural choice but since we’re just logging lines of text the NetCat source looked like a better fit. One of the issues we had with the NetCat source is that it’s TCP based so on the application side we’d need to implement timeouts and connection management on the application side. In addition to that, looking at the code of the NetCat source you’ll notice it’s implemented using traditional Java NIO sockets but if you check out the Avro source it’s built using Netty NIO which can leverage libevent on Linux.

Given those issues and our relaxed durability requirements we started looking at the available UDP sources. The Syslog UDP source looked the promising but it actually validates the format of the inbound messages so we wouldn’t be able to send messages with just the URLs. The code for the Syslog UDP source looked pretty straightforward so at this point we decided to build a custom source based on the existing Syslog UDP source. Our final code ended up looking like:

The big changes were in the implementation of messageReceived and the creation of the new extractEvent method. Including your new source in Flume is straightforward, you just need to build a JAR and drop that into Flume’s “lib/” folder. The easiest way to do this is with javac and jar to package it up. You’ll just need a binary copy of Flume so that you can reference its JARs. Build it with:

And then, you can test this out by creating a file named “agent1.conf” in your Flume directory containing:

Finally, you need to launch Flume by running:

ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console

And then to test it you can use “netcat” to fire off some UDP packets with:

ashish@ashish:~/Downloads$ echo "hi flume" | nc -4u -w1 localhost 44444

Which you should see come across your console that’s running Flume. Be aware, the Flume logger truncates messages so if you send a longer string you won’t see it in the logger.

And that’s it. Non-durable, UDP source built and deployed. Anyway, we’re still pretty new to Flume so any feedback or comments would be appreciated!

Apache Flume: Setting up Flume for an S3 sink

We’ve been evaluating Apache Flume over the last few weeks as part of a client project we’re working on. At a high level, our goal was to get plain text data generated by one of our applications running in a non-AWS datacenter back into Amazon S3 so that we could load it into Redshift. Reading through the Is Flume a good fit? section of their docs it perfectly describes this use case:

If you need to ingest textual log data into Hadoop/HDFS then Flume is the right fit for your problem, full stop

OK great, but what about writing into S3? It turns out you can use the HDFS sink to write into S3 if you use a “path” configuration formatted like ‘s3n://<AWS.ACCESS.KEY>:<AWS.SECRET.KEY>@<bucket.name>’.

But wait! Unfortunately Flume doesn’t ship with “batteries included” for writing to HDFS and S3 so you’ll need to grab a couple more dependencies before you can get this working. Frustratingly, you need to grab version compatible JARs of the Amazon S3 client, HDFS, and Hadoop with S3 compatibility. After flailing around downloading packages, hitting an error, downloading more JARs, and finally getting Flume working I realized there had to be a better way to replicate the process.

Enter Maven! Since we’re just grabbing down JARs it’s actually possible to use a pom.xml to describe what dependencies we need, let Maven grab the JARs, and then copy the JARs into a local folder. Here’s a working pom.xml file against Flume 1.6:

To use it, just run “mvn process-sources” and you’ll end up with all the JARs conveniently in a “lib/” folder in the current directory. Copy those JARs into the “lib/” folder of your Flume download and you should be off to the races. Note: These are very possibly more JARs than you need to get Flume running but as Maven dependencies this is the simplest I could come up with.

Flushing out the steps to getting a working S3 sink you should be able to do the following:

Before you run the last command to launch Flume you’ll need to edit “agent1.conf” to enter your Amazon token, secret key, and S3 bucket location. You’ll need to create the S3 bucket before trying to write to it with Flume. And then finally, to test that everything is working you can use netcat with the following:

Back on the terminal with Flume you should see debug data about receiving the message an a notification about an S3 upload. So what’s next? Not much, you’ll need to pick an appropriate source and then tune your HDFS and channel parameters for the amount of throughput you need.

As always, questions and comments welcome!

Getting started with Hadoop, Hive, and Sqoop

I apologize for the buzzword heavy title but it was the best I could do. I couldn’t find a good quick start explaining how to get started with Hive so I thought I’d share my experiences.

Anyway, a client of ours came to us needing to analyze a dataset that was about ~200 million rows over 6 months and is currently growing at about 10 million rows a week and increasing. From a reporting standpoint, they were looking to run aggregate counts and group bys over the data and then display the results on charts. Additionally, they were also looking to select subsets of the data and use them later – basically SELECT * FROM table WHERE x AND y AND z.

Obviously, doing the calculations in real time was out of the question so we knew we were looking for a solution that would be easy to use, support the necessary requirements and that would predictably scale with the increasing generation rate of data.

On the surface, MySQL looks like a decent approach but it presents a couple of issues pretty quickly:

  • In order for the SUM, GROUP BY, and COUNT queries to be at all useful the MySQL tables would have to be heavily indexed. Unfortunately, due to the write heavy workload of the app this would mean having to copy data into an indexed MySQL database before running any reports.
  • Even with indexes, MySQL was pretty awful at selecting subsets of the data from a performance perspective.
  • And probably the biggest issue with MySQL is that it doesn’t scale linearly in the sense that if the data is growing at 500 million rows a week you can’t simply “throw more hardware” at it and be done with it.

With requirements in hand we hit the Internet and finally arrived at Hive running on top of Hadoop. Per Wikipedia,

From our perspective, this stack fits our requirements nicely since it doesn’t rely on keeping a second “reporting” MySQL database available, it will handle both sum/count/group by and selecting subets, and probably most importantly it will allow us at least in the near term to scale with the increasing rate of data generation.

“Apache Hadoop is a software framework that supports data-intensive distributed applications under a free license. It enables applications to work with thousands of nodes and petabytes of data. Hadoop was inspired by Google’s MapReduce and Google File System (GFS) papers.”

To grossly over simplify, Hadoop provides a framework that allows you to break up a data intensive task into discrete pieces, run the pieces in a distributed fashion, and then combine the results giving you the results of the completed task. The quintessential example of a task that can be parallelized in this fashion is sorting a *really* big list since the list can be sorted in pieces and then the results can be combined at the end. See Merge Sort

The second piece of the tool chain is Hive. Again via Wikipedia,

“Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis.Apache Hive is a data warehouse infrastructure built on top of Hadoop for providing data summarization, query, and analysis.”

Basically, Hive is a tool that leverages the Hadoop framework to provide reporting and query capabilities using a syntax similar to SQL.

That just leaves Sqoop, the app with a funny name and no Wikipedia entry. Sqoop was originally developed by Cloudera and basically serves as an import tool for Hadoop. For my purposes, it allowed me to easily import the data from my MySQL database into Hadoop’s HDFS so I could use it in Hive.

The rest of this post walks you through setting up Hadoop+Hive and analyzing some MySQL data.

Now that you know the players, lets figure out what we’re actually trying to do.

  1. We want to start a Hadoop cluster to use Hive on.
  2. Load our data from a MySQL database into this Hadoop cluster.
  3. Use Hive to run some reports on this data.
  4. Warehouse the results of this data in MySQL so we can graph it (not that exciting).

Starting the cluster

Theres actually one more tool you’ll need to get this to work – Apache Whirr. Whirr is actually really cool, it lets you automatically start cluster services (Hadoop, Voldermont, etc.) at a handful of cloud platforms (AWS, Rackspace, etc.)

NOTE: We exclusively use AWS for our hosting so everything described here is specific to AWS.

Fisrt, download the latest copy of Whirr – http://www.fightrice.com/mirrors/apache//incubator/whirr/ to your local machine. Whirr should work everywhere but these directions will match up against Linux/OSX the best.

The first thing you’ll need is a Whirr configuration file describing the cluster you want to build. Create a file called hadoop.properties and paste in the following:

whirr.cluster-name=hadoop
whirr.instance-templates=1 hadoop-namenode+hadoop-jobtracker,2 hadoop-datanode+hadoop-tasktracker
whirr.hadoop-install-function=install_cdh_hadoop
whirr.hadoop-configure-function=configure_cdh_hadoop

whirr.provider=aws-ec2
whirr.identity=[REPLACE THIS WITH YOUR AWS ID]
whirr.credential=[REPLACE THIS WITH YOUR AWS SECRET]

whirr.hardware-id=m1.large
whirr.image-id=us-east-1/ami-68ad5201
whirr.location-id=us-east-1

whirr.private-key-file=~/.ssh/id_rsa
whirr.public-key-file=~/.ssh/id_rsa.pub

There isn’t a ton going on in the file but you’ll need to switch out the credential lines for your AWS credentials. Also, you’ll need to double check that the ssh paths are accurate for your account.

The next step, is to actually launch the cluster. To do this run this command – double check the path to your hadoop.properties file is accurate:

./bin/whirr launch-cluster --config hadoop.properties

Just give it a few minutes, you’ll see a bunch of debug info scrolling across your terminal and hopefully a success message once its done. At this point, you’ll have a fully built Hadoop cluster with 3 nodes as described in your properties file ( 1 hadoop-namenode+hadoop-jobtracker,2 hadoop-datanode+hadoop-tasktracker ).

You can see all your nodes by checking out your Whirr cluster directory.

cat ~/.whirr/hadoop/instances

Prepping and loading the cluster

Now that the cluster is up, you’ll need to prep it and then load your data with Sqoop.

One of the most irritating “gotchas” I stumbled across was that Whirr adds the firewall rules necessary for Hadoop to its AWS security group.

Before you do anything, open your EC2 control panel and modify the new Whirr security group (#jcloud-something) so that all of your nodes can connect to each other on port 3306 (MySQL)

The next step is to install mysql-client across the entire cluster since Sqoop uses mysqldump to get at your data. You could manually ssh into every machine but Whirr provides a convenient “run-script” command to do just that.

Create a file called “prepCluster.sh” and put “sudo apt-get -q -y install mysql-client” in it. Then make sure the paths are right and run,

./bin/whirr run-script --script prepCluster.sh --config hadoop.properties

Once its done, you’ll see the aptitude output from all your nodes as they downloaded the MySQL client.

The next step is to install mysql-server, hive, and sqoop on the jobtracker. Doing this is pretty straightforward, look at the .whirr/hadoop/instances file from above and copy the namenode hostname.

Next, ssh in to that machine using your current username as the username. Once you’re in, just run the following to install everything:

sudo apt-get -q -y install sqoop
sudo apt-get -q -y install hadoop-hive
sudo apt-get -q -y install screen

NOTE: You’ll also need the MySQL ConnectorJ library so that Sqoop can connect to MySQL. Download it here and place it in “/usr/lib/sqoop/lib/”

Once everything is done installing, you’ll most likely want to move your MySQL data directories from their default location onto the /mnt partition since it’s much larger. Check out this article for a good walk through. Don’t forget to update AppArmour or MySQL won’t start. Once MySQL is setup, load the data you want to crunch.

Now, you’ll need to use Sqoop to load the data from the MySQL database into Hadoop’s HDFS. While logged into the jobtracker node you can just run the following to do that. You’ll need to swap out the placeholders in the command and change the u/p.

sqoop-import-all-tables --connect jdbc:mysql://[IP of your jobtracker]/[your db_name] --username root --password root --verbose --hive-import

Once it completes, Sqoop will have copied all your MySQL data into Hadoop’s HDFS file system and initialized Hive for you.

Crunching the data

Run “hive” on the jobtracker and you’ll be ready to start crunching your data.

Check out the Hive language manual for more info on exactly what queries you can write.

Once you’ve narrowed down how to write your queries, you can use Hive’s “INSERT OVERWRITE LOCAL DIRECTORY” command to output the results of your query into a local directory.

Then, the next step would be to TAR up these results and use scp to copy the results back to your local machine to analyze or warehouse.

Shutting it down

The final thing you’ll need to do is shut down the cluster. Whirr makes it pretty easy:

./bin/whirr destroy-cluster --config hadoop.properties

Give it a few minutes and Whirr will shutdown the cluster and clean up the EC2 security group as well.

Anyway, hope this walk through proves useful for someone. As always, feedback, questions, and comments are all more than welcome.

Running Java apps from the crontab

Earlier this week I was completely dumbfounded by a PHP script that launched a Java app that seemed to work fine when it was run from the command line but kept failing when it was run from a cron.

The Java app in question was “ec2-describe-group” out of the Amazon EC2 API Tools package.  Basically, the ec2-describe-group tool hits the EC2 API and returns information about your account’s currently configured security groups.

The issue I was having was that when the PHP script was launched from a cron ec2-describe-group would keep returning an empty string, but when the script was launched from the CLI ec2-describe-group behaved normally.

After some poking around, I found this StackOverflow post which points out that most the environment variables your shell has aren’t available in a cronjob.

With that in mind, I tried adding JAVA_HOME as well as EC2_HOME to my crontab. Doing this is pretty straight forward, just add these two lines above any of your scheduled jobs:

EC2_HOME=/opt/ec2-api-tools-1.3.36506
JAVA_HOME=/etc/java-config-2/current-system-vm

Unfortunately, this still didn’t resolve the issue. On a whim, I decided to check what type of file ec2-describe-group actually is and discovered that its a Bash script not a Java JAR. Looking at the Bash, the file is actually just executing “EC2_HOME/bin/ec2-cmd DescribeGroups” but it utilizes other environment variables that my cron didn’t have.

For simplicity’s sake, I decided to just switch the PHP script to run ec2-cmd directly and finally everything started working as expected.