AWS: Using Kinesis with PHP for real time stream processing

Over the last few weeks we’ve been working with one of our clients to build out a real time data processing application. At a high level, the system ingests page view data, processes it in real time, and then ingests it into a database backend. In terms of scale, the system would need to start off processing roughly 30,000 events per minute at peak with the capability to scale out to 100,000 events per minute fairly easily. In addition, we wanted the data to become available to query “reasonably quickly” so that we could iterate quickly on how we were processing data.

To kick things off, we began by surveying the available tools to ingest, process, and then ultimately query the data. On the datawarehouse side, we had already had some positive experiences with Amazon Redshift so it was a natural choice to keep using it moving forward. In terms of ingestion and processing, we decided to move forward with Kinesis and Gearman. The fully managed nature of Kinesis made it the most appealing choice and Gearman’s strong PHP support would let us develop workers in a language everyone was comfortable with.

Our final implementation is fairly straightforward. An Elastic Load Balancer handles all incoming HTTP requests which are routed to any number of front end machines. These servers don’t do any computation and fire of messages into a Kinesis stream. On the backend, we have a consumer per Kinesis stream shard that creates Gearman jobs for pre-processing as well as Redshift data ingestion. Although it’s conceptually simple, there’s a couple of “gotchas” that we ran into implementing this system:

Slow HTTP requests are a killer: The Kinesis API works entirely over HTTP so anytime you want to “put” something into the stream it’ll require a HTTP request. The problem with this is that if you’re making these requests in real time in a high traffic environment you run the risk of locking up your php-fpm workers if the network latency to Kinesis starts to increase. We saw this happen first hand, everything would be fine and then all of a sudden the latency across the ELB would skyrocket when the latency to Kinesis increased. To avoid this, you need to make the Kinesis request in the background.

SSL certificate verification is REALLY slow: Kinesis is only available over HTTPs so by default the PHP SDK (I assume others as well) will perform an SSL key verification every time you use a new client. If you’re making Kinesis requests inside your php-fpm workers that means you’ll be verifying SSL keys on every request which turns out to be really slow. You can disable this in the official SDK using the “curl.options” parameter and passing in “CURLOPT_SSL_VERIFYHOST” and “CURLOPT_SSL_VERIFYPEER”

There’s no “batch” add operation: Interestingly Apache Kafka, which Kinesis is based on, supports batch operations but unfortunately Kinesis doesn’t. You have to make an HTTP request for every message you’re adding to the stream. What this means is that even if you’re queuing the messages in the background, you’ll still need to loop through them all firing off HTTP requests

Your consumer needs to be fast: In your consumer, you’ll basically end up with code that looks like – https://gist.github.com/adatta02/842531b3fe93097ee030 Because Kinesis shard iterators are only valid for 5 minutes, you’ll need to be cognizant of how long the inner for loop takes to run. Each “getRecords” call can return a max of 10,000 records so you’ll need to be able to process 10k records in less than 5 minutes. Our solution for this was to offload all the actual processing to Gearman jobs.

Anyway, we’re still fairly new to using Kinesis so I’m sure we’ll learn more about using it as the system is in production. A few things have already been positive including that it makes testing new code locally easy since you can just “tap” into the stream, scaling up looks like it means just adding additional shards, and since its managed we’ve got one less thing to worry about.

As always, questions and comments welcome!

Posted In: Amazon AWS, Big Data

Tags: , , ,

  • We’ve been messing around with Kafka recently to do a similar process (HTTP requests to server, then send those to Kafka, then load unto S3 and then Redshift). Have you thought about chunking the events on the server before sending them to Kafka? That might give you a performance win.

  • We had looked at Kafka but I wasn’t thrilled to have to administer a JVM service since we don’t have a ton of JVM experience in house. It looked like Kafka -> Storm Cluster -> Redshift would of been a viable pipeline though.

    It is unfortunate that Kinesis doesn’t support the batch operations, it looks like the Fluentd guys are specifically asking for this as well – https://groups.google.com/forum/#!topic/fluentd/l3hnAVHDAWM

  • Yea – that makes sense. We’re committed to the Java stack so it was really just figuring out how to get Kafka and Zookeeper set up. So far they’ve both been working well and we’re starting to experiment with Storm.

  • Nice – Storm seems like a well thought out computation model. Hoping to give it another spin soon.

  • Pingback: AWS: Using Kinesis with PHP for real time stream processing | Latest News()

  • On Mash

    I’m going to be implementing something similar (although the consumers will not be PHP) and figured the HTTP requests to add a record to Kinesis would be a problem.

    Can you go in to some more detail about how you do the requests in the background?

  • We actually ended up ditching Kinesis since the requirement to process data in realtime was relaxed. Conceptually though, I think if we had to implement something to use Kinesis what I’d suggest is using Netty (http://netty.io/) to process incoming HTTP requests, push the messages you want to send to Kinesis into a FIFO queue, and then using a separate thread to continually push items from the queue over to Kinesis.

    We’ve deployed Netty for an unrelated project and have been nothing but impressed with it’s performance and reliance. It also includes an async HTTP client which would be perfect for firing requests off to Kinesis.