AWS: Using Kinesis with PHP for real time stream processing

Over the last few weeks we’ve been working with one of our clients to build out a real time data processing application. At a high level, the system ingests page view data, processes it in real time, and then ingests it into a database backend. In terms of scale, the system would need to start off processing roughly 30,000 events per minute at peak with the capability to scale out to 100,000 events per minute fairly easily. In addition, we wanted the data to become available to query “reasonably quickly” so that we could iterate quickly on how we were processing data.

To kick things off, we began by surveying the available tools to ingest, process, and then ultimately query the data. On the datawarehouse side, we had already had some positive experiences with Amazon Redshift so it was a natural choice to keep using it moving forward. In terms of ingestion and processing, we decided to move forward with Kinesis and Gearman. The fully managed nature of Kinesis made it the most appealing choice and Gearman’s strong PHP support would let us develop workers in a language everyone was comfortable with.

Our final implementation is fairly straightforward. An Elastic Load Balancer handles all incoming HTTP requests which are routed to any number of front end machines. These servers don’t do any computation and fire of messages into a Kinesis stream. On the backend, we have a consumer per Kinesis stream shard that creates Gearman jobs for pre-processing as well as Redshift data ingestion. Although it’s conceptually simple, there’s a couple of “gotchas” that we ran into implementing this system:

Slow HTTP requests are a killer: The Kinesis API works entirely over HTTP so anytime you want to “put” something into the stream it’ll require a HTTP request. The problem with this is that if you’re making these requests in real time in a high traffic environment you run the risk of locking up your php-fpm workers if the network latency to Kinesis starts to increase. We saw this happen first hand, everything would be fine and then all of a sudden the latency across the ELB would skyrocket when the latency to Kinesis increased. To avoid this, you need to make the Kinesis request in the background.

SSL certificate verification is REALLY slow: Kinesis is only available over HTTPs so by default the PHP SDK (I assume others as well) will perform an SSL key verification every time you use a new client. If you’re making Kinesis requests inside your php-fpm workers that means you’ll be verifying SSL keys on every request which turns out to be really slow. You can disable this in the official SDK using the “curl.options” parameter and passing in “CURLOPT_SSL_VERIFYHOST” and “CURLOPT_SSL_VERIFYPEER”

There’s no “batch” add operation: Interestingly Apache Kafka, which Kinesis is based on, supports batch operations but unfortunately Kinesis doesn’t. You have to make an HTTP request for every message you’re adding to the stream. What this means is that even if you’re queuing the messages in the background, you’ll still need to loop through them all firing off HTTP requests

Your consumer needs to be fast: In your consumer, you’ll basically end up with code that looks like – https://gist.github.com/adatta02/842531b3fe93097ee030 Because Kinesis shard iterators are only valid for 5 minutes, you’ll need to be cognizant of how long the inner for loop takes to run. Each “getRecords” call can return a max of 10,000 records so you’ll need to be able to process 10k records in less than 5 minutes. Our solution for this was to offload all the actual processing to Gearman jobs.

Anyway, we’re still fairly new to using Kinesis so I’m sure we’ll learn more about using it as the system is in production. A few things have already been positive including that it makes testing new code locally easy since you can just “tap” into the stream, scaling up looks like it means just adding additional shards, and since its managed we’ve got one less thing to worry about.

As always, questions and comments welcome!

Symfony2: Using “request_matcher” for custom firewall rules

Last week we were looking to leverage a set of JSON API endpoints in a Symfony2 project to power a single page Javascript app. The way the API had been setup, the routes were all secured with an HTTP Basic Auth firewall matching on “/api”. This worked great for the mobile apps but for a Javascript app it would be awkward to have the user re-enter their credentials to authorize the basic auth firewall. What we really wanted to do was to leave everything “as is” but have Symfony use the normal cookie based firewall when we passed in a special “isOnlineApp” parameters on the URL.

Unfortunately, setting something like this up with the default “pattern” setting in your security.yml file isn’t possible. The “pattern” setting only matches on the route URL, not the parameters so there’s no way to have it selectively trigger when a parameter is present on a URL. So how do you do it? Well as it turns out, there’s a firewall configuration called “reuqest_matcher” which lets you “match” a firewall using a service. Just create a service that extends the RequestMatcherInterface, implment a “matches” function, and then add the class as a service.

Our code for the service ended up looking like:

And then the actual firewall configuration ends up being:

You don’t need a “pattern” setting anymore since the “matches” function supersedes it. Anyway, let me know if you have any questions!

PHP: Faking “typecasting” with reflection

As far as type systems go, PHPs is pretty schizophrenic. You’ve got primitive types, like strings and booleans, the ubiquitous “array” type, and then user defined classes. Most of the time, the type system is invisible since it barely enforces anything. Especially for basic types and the standard library, you can almost always use strings, booleans, and integers interchangeably without much complaining from the interpreter. Where things go sideways is when you start using user defined types, especially with type hinting.

Imagine we’ve got the following setup:

If you run that in a terminal, PHP will throw the following error:

PHP Catchable fatal error: Argument 1 passed to sayHello() must be an instance of Dog, instance of Pet given, called in /home/ashish/Downloads/dog.php on line 19 and defined in /home/ashish/Downloads/dog.php on line 14

Because even though every “Dog” is by definition a superset of the “Pet” class, PHP doesn’t see it that way. And now, our original problem. In most other object oriented languages, you’d be able to simply typecast the instance of Pet to a Dog and then call the function as expected. Unfortunately, PHP doesn’t natively support typecasting so we’re stuck looking for a crazy workaround. Enter Reflection. PHPs reflection library lets you do all sorts of nefarious things, like manipulating private properties and retrieving the source for an arbitrary object.

So how do you use it to do a bootleg typecast? It’s actually pretty straightforward:

The “copyShimmedObject” is the money maker. It basically pulls the private properties out of the “from”, makes the property public, and then sets them on the “to” object. If you run the sample you’ll get the expected output instead of the error above:

Hello: Fluffy of destroyer of worlds

Symfony: Log outgoing responses with kernel events

One of the nicest features of Symfony2 is the Request/Response paradigm for processing a HTTP request and then sending a response back to a client. At a high level, Symfony’s HttpFoundation component provides an object oriented abstraction to easily deal with HTTP requests and generate responses to send back to a client. Assuming application code correctly uses HttpFoundation, it will only interact with request variables through the Request class, as opposed to $_REQUEST, and only send output using the Response class, as opposed to an “echo”. Because of this contract, the framework as a whole makes it easy to manipulate responses before they’re sent back to a client.

A typical use case that leverages this would be logging API responses before they’re sent back to a client. As much as an API might be RESTful, at some point it’s easier to debug things when you can see the responses that clients have been receiving. OK great so how do you do it? It’s actually pretty straightforward, just create a class to receive the “kernel.terminate” event and register it as a service with the appropriate tags:

And then create the class where you want to manipulate or log the requests:

And that’s about it!

Note: Per Andras’ comment below the event has been switched to “kernel.terminate”.

Symfony2 and Gearman: Parallel Processing and Background Processing

On a few of our projects we have a few different needs to either queue items to be processed in the background or we need a single request to be able to process something in parallel. Generally we use Gearman and the GearmanBundle.  Let me explain a few different situations where we’ve found it handy to have Gearman around.

Background Processing

Often we’ll need to do something which takes a bit more time to process such as sending out a couple thousand push notifications to resizing several images. For this example lets use sending push notifications. You could have a person sit around as each notification is sent out and hope the page doesn’t timeout, however after a certain number of notifications, not to mention a terrible user experience, this approach will fail. Enter Gearman. With Gearman you are able to basically queue the event that a user has triggered a bunch of notifications that need to be processed and sent.

What we’ve done above is sent to the Gearman server a job to be processed in the background which means we don’t have to wait for it to finish. At this point all we’ve done is queued a job on the Gearman server, Gearman itself doesn’t know how to run the actual job. For that we create a ‘worker’ which reads jobs and processes them:

The worker will consume the job and then process it as it sees fit. In this case we just loop over each user ID and send them a notification.

Parallel Processing

One one of our applications users can associate their account with multiple databases. From there we go through each database and create different reports. On some of the application screens we let users poll each of their databases and we aggregate the data and create a real time report. The problem with doing this synchronously is that you have to go to each database one by one, meaning if you have 10 databases and each one takes 1 seconds to get the data from, you have at least ten seconds the user is waiting around; this doesn’t go well when you have 20 databases and so on. Instead, we use Gearman to farm out the task of going to each database and pull the data. From there, we have the request process total up all the aggregated data and display it. Now instead of waiting 10 seconds for each database, we farm out the work to 10 workers, wait 1 second and then can do any final processing and show it to the user. In the example below for brevity we’ve just done the totaling in a controller.

What we’ve done here is created a job for each connection. This time we add them as tasks, which means we’ll wait until they’ve completed. On the worker side it is similar to except you return some data, ie `return json_encode(array(‘total’=>50000));` at the end of the the function.

What this allows us to do is to farm out the work in parallel to all the databases. Each worker runs queries on the database, computes some local data and passes it back. From there you can add it all together (if you want) and then display it to the user. With the job running in parallel the number of databases you can process is no longer limited on your request, but more on how many workers you have running in the background. The beauty with Gearman is that the workers don’t need to live on the same machine, so you could have a cluster of machines acting as ‘workers’ and be able to process more database connections in this scenario.

Anyways, Gearman has really made parallel processing and farming out work much easier. As the workers are also written in PHP, it is very easy to reuse code between the frontend and the workers. Often, we’ll start a new report without Gearman; getting logic/fixing bugs in a single request without the worker is easier. After we’re happy with how the code works, we’ll move the code we wrote into the worker and have it just return the final result.

Good luck! Feel free to drop us a line if you need any help.