PHP: Using Gearman for a MapReduce inspired workflow

Over the last few weeks we’ve been utilizing Gearman to help us do some realtime stream processing. In production, what we’ve basically been doing is reading messages off an Amazon Kinesis stream, creating jobs in Gearman for anything that’s computationally expensive, and then gathering up the processed data for a batched insert into Amazon Redshift on a Gearman job as well. Conceptually, this workflow is reasonably similar to how MapReduce works where a series of input jobs is transformed by “mappers” and then results are collected in a “reduce” step.

From a practical point of view, using Gearman like this offers some interesting benefits:

  • Adding additional “map” capacity is relatively straightforward since you can just add additional machines that connect to the Gearman server.
  • Developing and testing the “map” and “reduce” functionality is easy since nothing is shared and you can run the code directly, independently of Gearman.
  • In our experience so far, the Gearman server can handle a high volume of jobs/minute – we’ve pushed ~300/sec without a problem.
  • Since Gearman clients exist for dozens of languages, you could write different pieces of the system in whatever language fits best.

Overview

OK, so how does all of this actually work. For the purposes of a demonstration, lets assume you’ve been tasked with scraping the META keywords and descriptions from a few hundred thousand sites and counting up word frequencies across all the sites. Assuming you were doing this in straight PHP, you’d end up with code that looks something like this.

The problem is that since you’re making the requests sequentially, scraping a significant number of URLs is going to take an intractable amount of time. What we really want to do is fetch the URLs in parallel, extract the META keywords, and then combine all that data in a single data structure.

To keep the amount of code down, I used the Symfony2 Console component, Guzzle and Monolog to provide infastructure around the project. Walking through the files of interest:

  • GearmanCommand.php: Command to execute either the “node” or the “master” Gearman workers.
  • StartScrapeCommand.php: Command to create the Gearman jobs to start the scrapers
  • Master.php: The code to gather up all the extracted keywords and maintain a running count.
  • Node.php: Worker code to extract the meta keywords from a given URL

Setup

Taking this for a spin is straightforward enough. Fire up an Ubuntu EC2 and then run the following:

OK, now that everything is setup lets run the normal PHP implementation.

Looks like about 10-12 seconds to process 100 URLs. Not terrible but assuming linear growth that means processing 100,000 URLs would take almost 2.5 hours which is a bit painful. You can verify it worked by looking at the “bin/nogearman_keyword_results.json” file.

Now, lets look at the Gearman version. Running the Gearman version is straightforward, just run the following:

You’ll eventually get an output from the “master” when it finishes with the total elapsed time. It’ll probably come in somewhere around 15ish seconds again because we’re still just using a single process to fetch the URLs.

Party in parallel

But now here’s where things get interesting, we can start adding multiple “worker” processes to do some of the computation in parallel. In my experience, the easiest way to handle this is using Supervisor since it makes starting and stopping groups of processes easy and also handles collecting their output. Run the following to copy the config file, restart supervisor, and verify the workers are running:

And now, you’ll want to run “application.php setfive:gearman master” in one terminal and in another run “php setfive:start-scraper 100sites.txt” to kick off the jobs.

Boom! Much faster. We’re still only doing 100 URLs so the effect of processing in parallel isn’t that dramatic. Again, you can check out the results by looking at “bin/keyword_results.json”.

The effects of using multiple workers will be more apparent when you’ve got a larger number of URLs to scrape. Inside the “bin” directory there’s a file named “quantcast_site_lists.tar.gz” which has site lists of different sizes up to the full 1 million from Quantcast.

I ran a some tests on the lists using different numbers of workers and the results are below.

0 Workers 10 Workers 25 Workers
100 URLs 12 sec. 12 sec. 5 sec.
1000 URLs 170 sec. 34 sec. 33 sec.
5000 URLs 1174 sec. 195 sec. 183 sec.
10000 URLs 2743 sec. 445 sec. 424 sec.

One thing to note, is if you run:

And notice that “processUrl” has zero jobs but there’s a lot waiting for “countKeywords”, you’re actually saturating the “reducer” and adding additional worker nodes in Supervisor isn’t going to increase your speed. Testing on a m3.small, I was seeing this happen with 25 workers.

Another powerful feature of Gearman is that it makes running jobs on remote hosts really easy. To add a “remote” to the job server, you’d just need to start a second machine, update the IP address in Base.php, and user the same Supervisor config to start a group of workers. They’d automatically register to your Gearman server and start processing jobs.

Anyway, as always questions and comments appreciated and all the code is on GitHub.