Over the last few weeks we’ve been working with one of our clients to build out a real time data processing application. At a high level, the system ingests page view data, processes it in real time, and then ingests it into a database backend. In terms of scale, the system would need to start off processing roughly 30,000 events per minute at peak with the capability to scale out to 100,000 events per minute fairly easily. In addition, we wanted the data to become available to query “reasonably quickly” so that we could iterate quickly on how we were processing data.
To kick things off, we began by surveying the available tools to ingest, process, and then ultimately query the data. On the datawarehouse side, we had already had some positive experiences with Amazon Redshift so it was a natural choice to keep using it moving forward. In terms of ingestion and processing, we decided to move forward with Kinesis and Gearman. The fully managed nature of Kinesis made it the most appealing choice and Gearman’s strong PHP support would let us develop workers in a language everyone was comfortable with.
Our final implementation is fairly straightforward. An Elastic Load Balancer handles all incoming HTTP requests which are routed to any number of front end machines. These servers don’t do any computation and fire of messages into a Kinesis stream. On the backend, we have a consumer per Kinesis stream shard that creates Gearman jobs for pre-processing as well as Redshift data ingestion. Although it’s conceptually simple, there’s a couple of “gotchas” that we ran into implementing this system:
Slow HTTP requests are a killer: The Kinesis API works entirely over HTTP so anytime you want to “put” something into the stream it’ll require a HTTP request. The problem with this is that if you’re making these requests in real time in a high traffic environment you run the risk of locking up your php-fpm workers if the network latency to Kinesis starts to increase. We saw this happen first hand, everything would be fine and then all of a sudden the latency across the ELB would skyrocket when the latency to Kinesis increased. To avoid this, you need to make the Kinesis request in the background.
SSL certificate verification is REALLY slow: Kinesis is only available over HTTPs so by default the PHP SDK (I assume others as well) will perform an SSL key verification every time you use a new client. If you’re making Kinesis requests inside your php-fpm workers that means you’ll be verifying SSL keys on every request which turns out to be really slow. You can disable this in the official SDK using the “curl.options” parameter and passing in “CURLOPT_SSL_VERIFYHOST” and “CURLOPT_SSL_VERIFYPEER”
There’s no “batch” add operation: Interestingly Apache Kafka, which Kinesis is based on, supports batch operations but unfortunately Kinesis doesn’t. You have to make an HTTP request for every message you’re adding to the stream. What this means is that even if you’re queuing the messages in the background, you’ll still need to loop through them all firing off HTTP requests
Your consumer needs to be fast: In your consumer, you’ll basically end up with code that looks like – https://gist.github.com/adatta02/842531b3fe93097ee030 Because Kinesis shard iterators are only valid for 5 minutes, you’ll need to be cognizant of how long the inner for loop takes to run. Each “getRecords” call can return a max of 10,000 records so you’ll need to be able to process 10k records in less than 5 minutes. Our solution for this was to offload all the actual processing to Gearman jobs.
Anyway, we’re still fairly new to using Kinesis so I’m sure we’ll learn more about using it as the system is in production. A few things have already been positive including that it makes testing new code locally easy since you can just “tap” into the stream, scaling up looks like it means just adding additional shards, and since its managed we’ve got one less thing to worry about.
As always, questions and comments welcome!