AWS: Using Kinesis with PHP for real time stream processing

Over the last few weeks we’ve been working with one of our clients to build out a real time data processing application. At a high level, the system ingests page view data, processes it in real time, and then ingests it into a database backend. In terms of scale, the system would need to start off processing roughly 30,000 events per minute at peak with the capability to scale out to 100,000 events per minute fairly easily. In addition, we wanted the data to become available to query “reasonably quickly” so that we could iterate quickly on how we were processing data.

To kick things off, we began by surveying the available tools to ingest, process, and then ultimately query the data. On the datawarehouse side, we had already had some positive experiences with Amazon Redshift so it was a natural choice to keep using it moving forward. In terms of ingestion and processing, we decided to move forward with Kinesis and Gearman. The fully managed nature of Kinesis made it the most appealing choice and Gearman’s strong PHP support would let us develop workers in a language everyone was comfortable with.

Our final implementation is fairly straightforward. An Elastic Load Balancer handles all incoming HTTP requests which are routed to any number of front end machines. These servers don’t do any computation and fire of messages into a Kinesis stream. On the backend, we have a consumer per Kinesis stream shard that creates Gearman jobs for pre-processing as well as Redshift data ingestion. Although it’s conceptually simple, there’s a couple of “gotchas” that we ran into implementing this system:

Slow HTTP requests are a killer: The Kinesis API works entirely over HTTP so anytime you want to “put” something into the stream it’ll require a HTTP request. The problem with this is that if you’re making these requests in real time in a high traffic environment you run the risk of locking up your php-fpm workers if the network latency to Kinesis starts to increase. We saw this happen first hand, everything would be fine and then all of a sudden the latency across the ELB would skyrocket when the latency to Kinesis increased. To avoid this, you need to make the Kinesis request in the background.

SSL certificate verification is REALLY slow: Kinesis is only available over HTTPs so by default the PHP SDK (I assume others as well) will perform an SSL key verification every time you use a new client. If you’re making Kinesis requests inside your php-fpm workers that means you’ll be verifying SSL keys on every request which turns out to be really slow. You can disable this in the official SDK using the “curl.options” parameter and passing in “CURLOPT_SSL_VERIFYHOST” and “CURLOPT_SSL_VERIFYPEER”

There’s no “batch” add operation: Interestingly Apache Kafka, which Kinesis is based on, supports batch operations but unfortunately Kinesis doesn’t. You have to make an HTTP request for every message you’re adding to the stream. What this means is that even if you’re queuing the messages in the background, you’ll still need to loop through them all firing off HTTP requests

Your consumer needs to be fast: In your consumer, you’ll basically end up with code that looks like – Because Kinesis shard iterators are only valid for 5 minutes, you’ll need to be cognizant of how long the inner for loop takes to run. Each “getRecords” call can return a max of 10,000 records so you’ll need to be able to process 10k records in less than 5 minutes. Our solution for this was to offload all the actual processing to Gearman jobs.

Anyway, we’re still fairly new to using Kinesis so I’m sure we’ll learn more about using it as the system is in production. A few things have already been positive including that it makes testing new code locally easy since you can just “tap” into the stream, scaling up looks like it means just adding additional shards, and since its managed we’ve got one less thing to worry about.

As always, questions and comments welcome!

Big Data: Amazon Redshift vs. Hive

In the last few months there’s been a handful blog posts basically themed “Redshift vs. Hive”. Companies from Airbnb to FlyData have been broadcasting their success in migrating from Hive to Redshift in both performance and cost. Unfortunately, a lot of casual observers have interpreted these posts to mean that Redshift is a “silver bullet” in the big data space. For some background, Hive is an abstraction layer that executes MapReduce jobs using Hadoop across data stored in HDFS. Amazon’s Redshift is a managed “petabyte scale” data warehouse solution that provides managed access to a ParAccel cluster and exposes a SQL interface that’s roughly similar to PostgreSQL. So where does that leave us?

From the outside, Hive and Redshift look oddly similar. They both promise “petabyte” scale, linear scalability, and expose an SQL’ish query syntax. On top of that, if you squint, they’re both available as Amazon AWS managed services through Elastic Mapreduce and of course Redshift. Unfortunately, that’s really where the similarities end which makes the “Hive vs. Redshift” comparisons along the lines of “apples to oranges”. Looking at Hive, its defining characteristic is that it runs across Hadoop and works on data stored in HDFS. Removing the acronym soup, that basically means that Hive runs MapReduce jobs across a bunch of text files that are stored in a distribued file system (HDFS). In comparison, Redshift uses a data model similar to PostgreSQL so data is structured in terms of rows and tables and includes the concept of indexes.

OK so who cares?

Well therein lays the rub that everyone seem to be missing. Hadoop, and by extension Hive (and Pig) are really good at processing text files. So imagine you have 10 million x 1mb XML documents or 100GB worth of nginx logs, this would be a perfect use case for Hive. All you would have to do is push them into HDFS or S3, write a RegEx to extract your data and then query away. Need to add another 2 million documents or 20GB of logs? No problem, just get them into HDFS and you’re good to go.

Could you do this with Redshift? Sure, but you’d need to pre-process 10 million XML documents and 100GB of logs to extract the appropriate fields, and then create CSV files or SQL INSERT statements to load into Redshift. Given the available options, you’re probably going to end up using Hadoop to do this anyway.

Where Redshift is really going to excel is in situations where your data is basically already relational and you have a clear path to actually get it into your cluster. For example, if you were running three x 15GB MySQL databases with unique, but related data, you’d be able to regularly pull that data into Redshift and then ad-hoc query it with regular SQL. In addition, since the data is already structured you’d be able to use the existing format to create keys in Redshift to improve performance.

Hammers, screws, etc

When it comes down it, it’ll come down to the old “right tool for the right job” aphorism. As an organization, you’ll have to evaluate how your data is structured, the types of queries you’re interested in running, and what level of abstraction you’re comfortable with. What’s definitely true is that “enterprise” data warehousing is being commoditized and the “old guard” better innovate or die.

Hive: Hive in 15 minutes on Amazon EMR

As far as “big data” solutions go, Hive is probably one of the more recognizable names. Hive basically offers the end user an abstraction layer to run “SQL like” queries as MapReduce jobs across data that they have in HDFS. Concretely, say you had several hundred million rows of data and you wanted to count the number of unique IDs Hive would let you do that. One of the issues with Hadoop and by proxy Hive is that it’s notably difficult to setup a cluster to try things out. Tools like Whirr exist to make things easier they’re, a bit rough around the edges and in my experience hit up against “version hell”. One alternative that I’m surprised isn’t more popular is using Amazon’s Elastic Map Reduce to bootstrap a Hadoop cluster to experiment with.

Fire up the cluster

The first thing you’ll need to do is fire up an EMR cluster from the AWS backend. It’s mostly just point and click but the settings I used were:

  • Termination protection? No
  • Logging? Disabled
  • Debugging? Off since no logging
  • Tags – None
  • AMI Version: 2.4.2 (latest)
  • Applications to be installed:
  • Hive
  • Pig
  • Hardware Configuration:
  • One m1.small for the master
  • Two m1.small for the cores

The “security and access” section is important, you need to select an existing key pair that you have access to so that you can SSH into your master node to use the Hive CLI client.

Then finally, under Steps since you’re not specifying any pre-determined steps make sure you mark “Auto-terminate” as “No” so that the cluster doesn’t terminate immediately after it boots.

Click “Create Cluster” and you’re off to the races.

Pull some data, and load HDFS

Once the cluster launches, you’ll see a dashboard screen with a bunch of information about the cluster including the public DNS address for the “Master”. SSH into this machine using the user “hadoop” and whatever key you launched the cluster with:

Once you’re in, you’ll want to grab some data to play with. I pulled down Wikipedia Page View data since it’s just a bunch of gzipped text files which are perfect for Hive. You can pull down a chunk of files using wget, be aware though that the small EC2s don’t have much storage so you’ll need to keep an eye on your disk space.

Once you have some data (grab a few GB), the next step is to push it over to HDFS, Hadoop’s distributed filesystem. As an aside, Amazon EMR is tightly integrated with Amazon S3 so if you already have a dataset in S3 you can copy directly from S3 to HDFS. Anyway, to push your files to HDFS just run:

Build some tables, query some data!

And finally, it’s time to query some of the pageview data using Hive. The first step is to let Hive know about your data and what format it’s stored in. To do this, you need to create an external table that points to the location of the files that you just pushed to HDFS. Start the Hive client by running “hive” and then do the following:

Now select some data from your newly created table!

Pretty sweet huh? Now feel free to run any arbitrary query against the data. Note: since we used m1.small EC2s the performance of Hive/Hadoop is going to be pretty abysmal. But hey, give it a shot:

Anyway, don’t forget to tear down the cluster once you’re done. As always, let me know if you run into any issues!

Big Data: What is “Big Data”?

Last week, I was catching up with a friend of mine and we started chatting about his most recent project. As we were chatting, he made an offhand comment about how some of the business guys on the team love to refer to what they are working on as a “big data” play, even though it really wasn’t. This stuck with me, since because of the vague definitions around “big data”, it’s easy to shoe horn problems into a “big data” play. Because of this, I think its worth taking a step back and discussing what big data really is and what tools are available to work with it.

It’s all just data

At the end of the day, data is data. It doesn’t really matter if its stored in a CSV text file, a MySQL database, or a NoSQL datastore like Cassandra or MongoDB. Typically though, web applications tend to use a relational database like MySQL or Postgres to persist data. Relational databases store data in a series of tables which are in turn arranged as a series of rows and columns. As an abstraction, think of a series of Excel worksheets which can have links between the rows of each sheet.

For most applications, this works out fine, the database ends up managing say a few thousand customer accounts, each with a few hundred thousand objects associated with them and the total dataset fits conveniently into the server’s RAM. Since the dataset is relatively small, things like retrieving information, updating records, and running ad-hoc analytics queries are all easy to implement and relatively fast. But what happens if your dataset doesn’t fit into memory of even the beefiest of servers? Therein lies the “big data” problem.

Certain applications generate an enormous amount of data on a daily basis. For example, look at Mixpanel, tracking discreet user interactions is going to produce hundreds of thousands of datapoints every day even with just a few clients. With this volume of data, typical relational databases quickly start performing sluggishly and eventually stop being effective entirely. Even simple queries like counting the “# of clicks by user” start to take hours to run, effectively becoming intractable. Although specialized relational databases like Vertica and Oracle 11g do exist to help solve this problem, they’re expensive and proprietery.

Enter the elephant

One of the first companies to publicly discuss their big data strategies was Google in Bigtable: A Distributed Storage System for Structured Data which described their BigTable datastorage system. Although a proprietary solution, the research paper was used as the basis for Apache Hadoop, an open source framework for running MapReduce style jobs over large datasets.

At this point, Hadoop has distinguished itself as the most popular open source big data solution with a rich ecosystem of tools and several companies providing professional services and support including Cloudera and Hortonworks. What Hadoop provides is a low level framework for allowing computation jobs to be distributed across several servers within a cluster. This allows tools to split up very large datasets into smaller chunks, distribute computational tasks across the cluster, and finally assemble the result. So with the Hadoop framework in place, you still need specific tools built to leverage the distributed framework.

The toolbox

There are several tools that effectively leverage Hadoop but here are some of my favorites for quickly building out a cluster:

Apache Whirr – Automates deploying, bootstrapping, and configuring a Hadoop cluster. Whirr will save you hours of time because instead of manually starting 4 EC2s and configuring them all you can kickstart a cluster with a single command.

Apache HBase – A column store database that is similar to Google’s original BigTable system. Great for storing billions of records across a Hadoop HDFS file system.

Apache Hive – A datawharehousing solution that allows you to run “SQL like” queries using Hadoop. It also has native support for pulling data out of MySQL, making it a convenient addition to a stack includes MySQL.

Apart from these, there are dozens of other Hadoop powered tools but its impossible to recommend a single silver bullet without knowing the details of your “big data” problem.

Words of Congress: Fun with Hadoop

For the last few weeks we’ve been working on a project that involved dealing with bills in the US House and Senate. Naturally, I decided it was time to make a word cloud from the frequencies of the words in the bills!

Checkout the final product here.

I decided to use only the bills from the 111th congress (the current one), all the bills (6703 of them) were downloaded from the THOMAS library at The files are XML documents that have the full text of the bills along with some meta data.

Not really to many files but I decided to use Hadoop and try and Map/Reduce the bills to count up the word frequencies. Getting Hadoop to run locally was pretty straightforward – just tell it where JAVA_HOME is and I was off to the races. Fortunately enough, one of the pre-canned examples was a word frequency counter so I decided to modify that for what I wanted.

The example map/reduce was written to process plain text files so I had to modify it to work with the XML documents. What this involved was writing a custom InputFormat class to open each bill, extract the appropriate plain text from the XML, and then pass this back as the “data”. I also modified the word counter to ignore words shorter than 6 characters.

I tested locally with a small subset of bills and everything seemed to be working fine. The trouble started when I tried to bring up Daum’s machine as a slave to my machine. After some finagling and hair pulling I finally got it working. The takeaways were:

  • You can’t run your DataNode on localhost, it needs to be your computer’s hostname to accept connections.
  • Hostnames are important. If you don’t have a DNS server make sure your hostnames are aliased in /etc/hosts
  • If your HDFS set up is showing 100% utilization but you know it isn’t true, try rm’ing the data file and then re-formatting your namenode.
  • If a copy or reduce step fails in distributed mode the error messages are usually really cryptic – check the actual logs.
  • When something throws an exception during a map or reduce operation, the error won’t be reported to STDOUT

Anyway, it was a slightly frustrating but rewarding experience – I even got to code some Java! The visualization of the word frequencies is here.

Might be about time to process one of the Amazon datasets with EC2