Following up on our previous post, after evaluating Flume we decided it was a good fit and chose to move forward with it. In our specific use case the data we are gathering is ephemeral so we didn’t need to enforce any deliverability or durability guarantees. For us, missing messages or double delivery is fine as long as the business logic throughput on the application side wasn’t affected. Concretely, our application is high volume, low latency HTTP message broker and we’re looking to record the request URLs via Flume into S3.
One of the compelling aspects of Flume is that it ships with several ways to ingest and syndicate your data via sources and sinks. Since we’re targeting S3 we’d settled on using the default HDFS sink but we have some options on the source. For a general case with complex events the Avro source would be the natural choice but since we’re just logging lines of text the NetCat source looked like a better fit. One of the issues we had with the NetCat source is that it’s TCP based so on the application side we’d need to implement timeouts and connection management on the application side. In addition to that, looking at the code of the NetCat source you’ll notice it’s implemented using traditional Java NIO sockets but if you check out the Avro source it’s built using Netty NIO which can leverage libevent on Linux.
Given those issues and our relaxed durability requirements we started looking at the available UDP sources. The Syslog UDP source looked the promising but it actually validates the format of the inbound messages so we wouldn’t be able to send messages with just the URLs. The code for the Syslog UDP source looked pretty straightforward so at this point we decided to build a custom source based on the existing Syslog UDP source. Our final code ended up looking like:
The big changes were in the implementation of messageReceived and the creation of the new extractEvent method. Including your new source in Flume is straightforward, you just need to build a JAR and drop that into Flume’s “lib/” folder. The easiest way to do this is with javac and jar to package it up. You’ll just need a binary copy of Flume so that you can reference its JARs. Build it with:
And then, you can test this out by creating a file named “agent1.conf” in your Flume directory containing:
Finally, you need to launch Flume by running:
ashish@ashish:~/Downloads/apache-flume-1.6.0-bin$ bin/flume-ng agent --conf conf --conf-file agent1.conf --name a1 -Dflume.root.logger=INFO,console
And then to test it you can use “netcat” to fire off some UDP packets with:
ashish@ashish:~/Downloads$ echo "hi flume" | nc -4u -w1 localhost 44444
Which you should see come across your console that’s running Flume. Be aware, the Flume logger truncates messages so if you send a longer string you won’t see it in the logger.
And that’s it. Non-durable, UDP source built and deployed. Anyway, we’re still pretty new to Flume so any feedback or comments would be appreciated!
We’ve been evaluating Apache Flume over the last few weeks as part of a client project we’re working on. At a high level, our goal was to get plain text data generated by one of our applications running in a non-AWS datacenter back into Amazon S3 so that we could load it into Redshift. Reading through the Is Flume a good fit? section of their docs it perfectly describes this use case:
If you need to ingest textual log data into Hadoop/HDFS then Flume is the right fit for your problem, full stop
OK great, but what about writing into S3? It turns out you can use the HDFS sink to write into S3 if you use a “path” configuration formatted like ‘s3n://<AWS.ACCESS.KEY>:<AWS.SECRET.KEY>@<bucket.name>’.
But wait! Unfortunately Flume doesn’t ship with “batteries included” for writing to HDFS and S3 so you’ll need to grab a couple more dependencies before you can get this working. Frustratingly, you need to grab version compatible JARs of the Amazon S3 client, HDFS, and Hadoop with S3 compatibility. After flailing around downloading packages, hitting an error, downloading more JARs, and finally getting Flume working I realized there had to be a better way to replicate the process.
Enter Maven! Since we’re just grabbing down JARs it’s actually possible to use a pom.xml to describe what dependencies we need, let Maven grab the JARs, and then copy the JARs into a local folder. Here’s a working pom.xml file against Flume 1.6:
To use it, just run “mvn process-sources” and you’ll end up with all the JARs conveniently in a “lib/” folder in the current directory. Copy those JARs into the “lib/” folder of your Flume download and you should be off to the races. Note: These are very possibly more JARs than you need to get Flume running but as Maven dependencies this is the simplest I could come up with.
Flushing out the steps to getting a working S3 sink you should be able to do the following:
Before you run the last command to launch Flume you’ll need to edit “agent1.conf” to enter your Amazon token, secret key, and S3 bucket location. You’ll need to create the S3 bucket before trying to write to it with Flume. And then finally, to test that everything is working you can use netcat with the following:
Back on the terminal with Flume you should see debug data about receiving the message an a notification about an S3 upload. So what’s next? Not much, you’ll need to pick an appropriate source and then tune your HDFS and channel parameters for the amount of throughput you need.
As always, questions and comments welcome!
Posted In: Big Data
Over the last few weeks we’ve been working with one of our clients to build out a real time data processing application. At a high level, the system ingests page view data, processes it in real time, and then ingests it into a database backend. In terms of scale, the system would need to start off processing roughly 30,000 events per minute at peak with the capability to scale out to 100,000 events per minute fairly easily. In addition, we wanted the data to become available to query “reasonably quickly” so that we could iterate quickly on how we were processing data.
To kick things off, we began by surveying the available tools to ingest, process, and then ultimately query the data. On the datawarehouse side, we had already had some positive experiences with Amazon Redshift so it was a natural choice to keep using it moving forward. In terms of ingestion and processing, we decided to move forward with Kinesis and Gearman. The fully managed nature of Kinesis made it the most appealing choice and Gearman’s strong PHP support would let us develop workers in a language everyone was comfortable with.
Our final implementation is fairly straightforward. An Elastic Load Balancer handles all incoming HTTP requests which are routed to any number of front end machines. These servers don’t do any computation and fire of messages into a Kinesis stream. On the backend, we have a consumer per Kinesis stream shard that creates Gearman jobs for pre-processing as well as Redshift data ingestion. Although it’s conceptually simple, there’s a couple of “gotchas” that we ran into implementing this system:
Slow HTTP requests are a killer: The Kinesis API works entirely over HTTP so anytime you want to “put” something into the stream it’ll require a HTTP request. The problem with this is that if you’re making these requests in real time in a high traffic environment you run the risk of locking up your php-fpm workers if the network latency to Kinesis starts to increase. We saw this happen first hand, everything would be fine and then all of a sudden the latency across the ELB would skyrocket when the latency to Kinesis increased. To avoid this, you need to make the Kinesis request in the background.
SSL certificate verification is REALLY slow: Kinesis is only available over HTTPs so by default the PHP SDK (I assume others as well) will perform an SSL key verification every time you use a new client. If you’re making Kinesis requests inside your php-fpm workers that means you’ll be verifying SSL keys on every request which turns out to be really slow. You can disable this in the official SDK using the “curl.options” parameter and passing in “CURLOPT_SSL_VERIFYHOST” and “CURLOPT_SSL_VERIFYPEER”
There’s no “batch” add operation: Interestingly Apache Kafka, which Kinesis is based on, supports batch operations but unfortunately Kinesis doesn’t. You have to make an HTTP request for every message you’re adding to the stream. What this means is that even if you’re queuing the messages in the background, you’ll still need to loop through them all firing off HTTP requests
Your consumer needs to be fast: In your consumer, you’ll basically end up with code that looks like – https://gist.github.com/adatta02/842531b3fe93097ee030 Because Kinesis shard iterators are only valid for 5 minutes, you’ll need to be cognizant of how long the inner for loop takes to run. Each “getRecords” call can return a max of 10,000 records so you’ll need to be able to process 10k records in less than 5 minutes. Our solution for this was to offload all the actual processing to Gearman jobs.
Anyway, we’re still fairly new to using Kinesis so I’m sure we’ll learn more about using it as the system is in production. A few things have already been positive including that it makes testing new code locally easy since you can just “tap” into the stream, scaling up looks like it means just adding additional shards, and since its managed we’ve got one less thing to worry about.
As always, questions and comments welcome!
In the last few months there’s been a handful blog posts basically themed “Redshift vs. Hive”. Companies from Airbnb to FlyData have been broadcasting their success in migrating from Hive to Redshift in both performance and cost. Unfortunately, a lot of casual observers have interpreted these posts to mean that Redshift is a “silver bullet” in the big data space. For some background, Hive is an abstraction layer that executes MapReduce jobs using Hadoop across data stored in HDFS. Amazon’s Redshift is a managed “petabyte scale” data warehouse solution that provides managed access to a ParAccel cluster and exposes a SQL interface that’s roughly similar to PostgreSQL. So where does that leave us?
From the outside, Hive and Redshift look oddly similar. They both promise “petabyte” scale, linear scalability, and expose an SQL’ish query syntax. On top of that, if you squint, they’re both available as Amazon AWS managed services through Elastic Mapreduce and of course Redshift. Unfortunately, that’s really where the similarities end which makes the “Hive vs. Redshift” comparisons along the lines of “apples to oranges”. Looking at Hive, its defining characteristic is that it runs across Hadoop and works on data stored in HDFS. Removing the acronym soup, that basically means that Hive runs MapReduce jobs across a bunch of text files that are stored in a distribued file system (HDFS). In comparison, Redshift uses a data model similar to PostgreSQL so data is structured in terms of rows and tables and includes the concept of indexes.
Well therein lays the rub that everyone seem to be missing. Hadoop, and by extension Hive (and Pig) are really good at processing text files. So imagine you have 10 million x 1mb XML documents or 100GB worth of nginx logs, this would be a perfect use case for Hive. All you would have to do is push them into HDFS or S3, write a RegEx to extract your data and then query away. Need to add another 2 million documents or 20GB of logs? No problem, just get them into HDFS and you’re good to go.
Could you do this with Redshift? Sure, but you’d need to pre-process 10 million XML documents and 100GB of logs to extract the appropriate fields, and then create CSV files or SQL INSERT statements to load into Redshift. Given the available options, you’re probably going to end up using Hadoop to do this anyway.
Where Redshift is really going to excel is in situations where your data is basically already relational and you have a clear path to actually get it into your cluster. For example, if you were running three x 15GB MySQL databases with unique, but related data, you’d be able to regularly pull that data into Redshift and then ad-hoc query it with regular SQL. In addition, since the data is already structured you’d be able to use the existing format to create keys in Redshift to improve performance.
When it comes down it, it’ll come down to the old “right tool for the right job” aphorism. As an organization, you’ll have to evaluate how your data is structured, the types of queries you’re interested in running, and what level of abstraction you’re comfortable with. What’s definitely true is that “enterprise” data warehousing is being commoditized and the “old guard” better innovate or die.
A weeks ago, Facebook released a new open source project called PrestoDB which they billed as a market improvement over Hive and Hadoop. According to the PrestoDB site, Presto is a real time query engine that supports a SQL like syntax, similar to Hive. However, unlike Hive, Presto doesn’t execute queries using MapReduce jobs but instead uses its own internal distribution mechanism. According to the Presto site and current users, most queries will see an order of magnitude speedup compared to Hive. And the best part? PrestoDB can read metadata from Hive’s metastore and read files off HDFS just like Hive – pretty wild.
Anyway, since I love new toys (who doesn’t!?) I decided to try setting up PrestoDB on Amazon EMR to see how difficult it was and also experience the speedups. Turns out, once you have an Amazon EMR cluster running getting PrestoDB up is almost trivial. Just follow the PrestoDB deploying directions to get yourself situated. Make sure you create *all* the files or you’ll get some necessarily cryptic errors along the way.
The config files I ended up using were:
You’ll need to create the “/mnt/presto” directory and also make it accessible to whatever user you plan to run the daemon under.
The one huge gotcha I ran into was that I couldn’t figure out what port Hive’s Thrift service was running on. For some reason, it’s notably absent from Amazon’s documentation and I couldn’t find the hive-site.xml file on the EMR EC2. Completely randomly, I ran across this manual page from Jaspersoft enumerating which ports different versions of Hive run Thrift on when you use EMR. Turns out, its different per Hive version but 0.11.0 will use 10004.
Once you have everything configured, just follow the docs to start the server and you’ll be ready to query. One thing to note though is that you’ll need to setup PrestoDB manually on the rest of your machines and also enable the discovery service for this to “really” work.
Anyway, happy querying!
Posted In: Big Data