Symfony2 and Gearman: Parallel Processing and Background Processing

On a few of our projects we have a few different needs to either queue items to be processed in the background or we need a single request to be able to process something in parallel. Generally we use Gearman and the GearmanBundle. ┬áLet me explain a few different situations where we’ve found it handy to have Gearman around.

Background Processing

Often we’ll need to do something which takes a bit more time to process such as sending out a couple thousand push notifications to resizing several images. For this example lets use sending push notifications. You could have a person sit around as each notification is sent out and hope the page doesn’t timeout, however after a certain number of notifications, not to mention a terrible user experience, this approach will fail. Enter Gearman. With Gearman you are able to basically queue the event that a user has triggered a bunch of notifications that need to be processed and sent.

What we’ve done above is sent to the Gearman server a job to be processed in the background which means we don’t have to wait for it to finish. At this point all we’ve done is queued a job on the Gearman server, Gearman itself doesn’t know how to run the actual job. For that we create a ‘worker’ which reads jobs and processes them:

The worker will consume the job and then process it as it sees fit. In this case we just loop over each user ID and send them a notification.

Parallel Processing

One one of our applications users can associate their account with multiple databases. From there we go through each database and create different reports. On some of the application screens we let users poll each of their databases and we aggregate the data and create a real time report. The problem with doing this synchronously is that you have to go to each database one by one, meaning if you have 10 databases and each one takes 1 seconds to get the data from, you have at least ten seconds the user is waiting around; this doesn’t go well when you have 20 databases and so on. Instead, we use Gearman to farm out the task of going to each database and pull the data. From there, we have the request process total up all the aggregated data and display it. Now instead of waiting 10 seconds for each database, we farm out the work to 10 workers, wait 1 second and then can do any final processing and show it to the user. In the example below for brevity we’ve just done the totaling in a controller.

What we’ve done here is created a job for each connection. This time we add them as tasks, which means we’ll wait until they’ve completed. On the worker side it is similar to except you return some data, ie `return json_encode(array(‘total’=>50000));` at the end of the the function.

What this allows us to do is to farm out the work in parallel to all the databases. Each worker runs queries on the database, computes some local data and passes it back. From there you can add it all together (if you want) and then display it to the user. With the job running in parallel the number of databases you can process is no longer limited on your request, but more on how many workers you have running in the background. The beauty with Gearman is that the workers don’t need to live on the same machine, so you could have a cluster of machines acting as ‘workers’ and be able to process more database connections in this scenario.

Anyways, Gearman has really made parallel processing and farming out work much easier. As the workers are also written in PHP, it is very easy to reuse code between the frontend and the workers. Often, we’ll start a new report without Gearman; getting logic/fixing bugs in a single request without the worker is easier. After we’re happy with how the code works, we’ll move the code we wrote into the worker and have it just return the final result.

Good luck! Feel free to drop us a line if you need any help.

Posted In: PHP, Symfony

Tags: , , , , ,

  • Awesome integration! That’s a great tool!

  • Marc Morera Merino

    Great post Matt :)

  • Massimiliano Arione

    Injecting the whole container always looks like a bad smell to me… go explicit and inject just services you need!

  • Agreed, for simplicity of the example just did that. In our cases we usually just do doctrine and routing.

  • Luke Mackenzie

    Can you share your event listener code?

  • Luke-

    Sure, but not sure what you mean? The callback (from the event) is the gearmanCallback function. The events itself are sent via the Symfony2 event system.

    Matt

  • Luke Mackenzie

    I was told by the author of the bundle that I needed to write an event listener if I wanted to hook into the status callback event but he doesn’t provide a working example. If you didn’t write a custom event listener that might be the cause of confusion. I want to monitor job progress / status.

  • In this case https://gist.github.com/daum/9260047#file-gistfile1-php-L7 is where you’d specify the class and callback method name. In my example it is just inside the controller. So you can bind to all the different events there.

  • abbiya

    $job->getWorkload() is giving an undefined function error. $job->workload() works

  • Ah thanks for pointing that out. In my actual class I had made a few changes which getWorkload actually returns a JSON decoded one already (along with some other customizations). I’ve updated the example.

  • Hi,
    What do you when a worker fails (connection error for ex). How do you tell it to retry at some point later on and save the job in the queue?

  • Hey –

    Depends on how you want to handle the errors. In my case if there was a connection error, for example, we’d just return an error message to the user saying “couldn’t connect to xyz”. In other cases, if you want it to retry, we’ll throw an exception back. This way we are notified (we use the errornotifierbundle) and then the job is requeued on gearman automatically. If you don’t return to gearman a proper return code, it will requeue the job.

    One thing to look out for is poison jobs. These are jobs which will never work, perhaps the connection details are incorrect. If you keep putting the jobs back in the queue, you could make it so that your background workers keep crashing on the same job, over and over. At some point, if you are using supervisor, they may be marked as fully crashed. I’d suggest looking at the –job-retries parameter to solve this issue. Basically that lets you tell Gearman to server to only requeue a job so many times before the job is considered “bad” and is just discarded.

    Matt

  • OK Thx ! I Just figured out this is impossible with background task :/

  • John Pezzetti

    Hi Matt, thanks for the post. I’ve followed what you have here for background processing…the only difference for me is that my background jobs don’t execute unless I run the console command gearman:job:execute jobname. Do you have this command running int he background, or do your jobs execute automatically? I’m not able to tell what GearmanBundle’s intended method is for this. Thanks~

  • Hey John-

    Correct you need to run that command so the background workers are running. I user supervisor to make sure that they are always running.

  • Nahuel Velazco

    I have really a two big questions:
    – I send a backgroundJob that takes 3 minutes. I have to send it everytime is requested. But only is executed the first whom requested. The others should be waiting on the queue, right? And when the worker is free, should enter the next on the queue right? But the others never arrive to the worker. What could be happened?
    – Why a worker needs iterations? Iterations means that end to work when has processed n-iterations. Why should I want to kill my workers?

    Great post!

  • – to your first question are your workers all looking at the same queue (job name)? Do you have all the other ones running at the same time as the one? If they, themselves, aren’t working then yes as soon as they are idle they should take the next available job.

    – For the iterations, often you’ll want to restart the workers after X iterations so that if there are memory leaks they don’t take all your server memory up. Also if you have something like lets just say a counter service. You may want it to start at 0 each time a new job starts. You could reset it manually, but you are guaranteed a “fresh” environment each time it restarts.

  • Nahuel Velazco

    a) Jey Matt. Thanks a lot for your help!!!! It’s working! I’ve two days fighting with that. The strange thing is when theres was only a worker. I guess was something that i wass missing. Yes. The jobs are queing.

    b) Wow! You are a gearman ninja. Thank a lot for sharing you knowledge.

  • Nahuel Velazco

    I have another big question. How can I call a service like I was in the controller from the worker. Some like this:

    public function testSlowOperation(GearmanJob $job)
    {

    $this->get(‘LogicService’)->myFunction();//THIS IS WHAT I WOULD LIKE!

    echo “Finishing a slow op…” . PHP_EOL;
    }

    I see on the constructor: public function __construct(Container $container):

    But on the symfony obviously says:
    [ErrorException]
    Catchable Fatal Error: Argument 1 passed to CH212appBackendBundleWorkersBackendWorker::__construct() must be an instance of SymfonyComponentDependencyInjectionContainer, none given, called in /

    var/www/vhosts/dev.linkemann.net/httpdocs/212vip_devel4/server/vendor/mmoreram/gearman-bundle/Mmoreram/GearmanBundle/Service/GearmanExecute.php on line 109 and defined in /var/www/vhosts/dev.linkemann.net/httpdocs/212vip_devel4/server/src/CH212app/BackendBundle/Workers/BackendWorker.php line 17

    Maybe Callback can get the containers as normal controllers. The main proble is the service is completely coupled with all dependencies.

    How do you inject services or containers on the constructor? Thank you Matt!

  • When you declare your worker as a service put in the arguments: [@container]

  • Nahuel Velazco

    Thank you Matt!