1
2010
Multiprocessing mit einer simplen Job Queue und PCNTL
Hallo zusammen,
da ich in letztens auf der Suche nach einer Möglichkeit war die Laufzeit eines langwierigen Konsolen Scripts zu beschleunigen, wurde ich auf die Process Control kurz PCNTL Extension aufmerksam.
Hier mal frei nach php.net: Die PCTNL Extension in PHP implementiert die Unix Prozesserzeugung, die Prozessausführung, die Signalverwaltung und die Prozessbeendigung.
Mir ging es dabei letztendlich um die Prozesserzeugung, also um die Möglichkeit von einem Hauptprozess mehrere Kindprozesse abzweigen zu lassen (forken).
Hier die Ausgangssituation:
Ich hatte ein Skript dass sich von ca. 50 verschiedenen Urls Bilder abholte. Bisher lief das ganze schön in einem Skript Url für Url durch, was auf Dauer ziemlich langsam wurde.
Das ganze sah ungefähr so aus.
$urls = getImageUrls();
$client = new Zend_Http_Client();
foreach ($urls as $url) {
$client->setUri($url);
$content = new Zend_Dom_Query($client->request()->getBody());
// ... zeugs um die bildurls aus dem request body heraus zu parsen
foreach ($imageUrls as $imageUrl) {
$file = $client->setUri($imageUrl)->request()->getBody();
file_put_contents('irgendeinname', $file);
}
}
Mein erster Ansatz mit der PCNTL Extension war nun jede URL als eigenen Prozess in einer Art Queue laufen zu lassen.
$queue = getImageUrls();
$client = new Zend_Http_Client();
for ($i = 0; $i < count($queue); $i++) {
$pid = pcntl_fork();
if (!$pid) {
$client->setUri($queue[$i]);
$content = new Zend_Dom_Query($client->request()->getBody());
// ... zeugs um die bildurls aus dem request body heraus zu parsen
foreach ($imageUrls as $imageUrl) {
$file = $client->setUri($imageUrl)->request()->getBody();
file_put_contents('irgendeinname', $file);
}
}
}
Das war schön und gut, aber berücksichtigte nicht die Tatsache, dass ein geforkter Prozess immer eine Kopie das aktuell laufenden Eltern Prozesses war.
Daraus ergaben sich natürlich Probleme im Bezug auf alle benutzten Variablen. Ich musste also dafür sorgen, dass jeder Kindprozess seine eigenen Variablen benutzte.
Also entschloss ich mich dazu, eine simple MultiprocessQueue zu bauen, die Jobs ausführen konnte.
Wobei jeder Job eine eigene Instanz einer beliebigen Jobklasse ist, die von einer abstrakten Jobklasse abgeleitet wurde.
Wichtig ist an dieser Stelle noch anzumerken, dass die PCNTL Extension nicht auf Windows Systemen läuft, sondern ausschließlich auf Unix Systemen. Des weiteren sollte sie nicht in einer Webserverumgebung ausgeführt werden um unerwünschte Nebeneffekte zu vermeiden.
Um nun mein Ziel zu erreichen, erstellte ich zunächst eine abstrakte Job Klasse die im Prinzip so aussah:
abstract class DT_Multiprocess_Job_Abstract
{
const STATUS_PENDING = 1;
const STATUS_RUNNING = 2;
const STATUS_FINISHED = 3;
protected $_jobId;
protected $_status = self::STATUS_PENDING;
public function execute ()
{
throw new DT_Multiprocess_Exception(__FUNCTION__
. ' must be implemented');
}
public final function setJobId ($jobId)
{
$this->_jobId = $jobId;
}
public final function getJobId ()
{
return $this->_jobId;
}
public final function setStatus ($status)
{
$this->_status = $status;
}
public final function getStatus ()
{
return $this->_status;
}
}
Und dazu die Queue Klasse, die von DT_Multiprocess_Job_Abstract abgeleitete Klassen ausführen konnte.
Hier ein kurzer Auszug der wichtigsten Funktionen:
class DT_Multiprocess_Job_Queue
{
/**
* jobs
*
* @var array
*/
protected $_jobs;
...
/**
* Adds a new job to the queue
*
* Adds a new job to the queue. The job has to extend from
* DT_Multiprocess_Job_Abstract in order to be accepted.
*
* @param DT_Core_Job_Abstract $job
* @return void
*/
public function addJob (DT_Multiprocess_Job_Abstract $job)
{
$this->_jobs[] = $job;
}
/**
* forks the available jobs and executes them
*
* @param void
*/
public function work ()
{
for ($jobId = 0; $jobId < count($this->_jobs); $jobId++) {
$pid = pcntl_fork();
if (!$pid) {
$this->_jobs[$jobId]->setJobId($jobId);
$this->_jobs[$jobId]->execute();
exit($jobId);
}
}
// wait for all childs to finish
while (pcntl_waitpid(0, $status) != -1) {
$status = pcntl_wexitstatus($status);
$this->_jobs[$status]->setStatus(
DT_Multiprocess_Job_Abstract::STATUS_FINISHED
);
}
}
}
Nun konnte ich einen konkreten Job implementieren und Instanzen von diesem von der Queue abarbeiten lassen.
Hier vereinfacht der Bildjob:
class Job_ExampleImageFetching extends DT_Multiprocess_Job_Abstract
{
protected $_url;
...
public function __construct ($url)
{
$this->_url = $url;
}
public function execute()
{
$this->setStatus(self::STATUS_RUNNING);
$client = new Zend_Http_Client();
$client->setUri($this->_url);
$content = new Zend_Dom_Query($client->request()->getBody());
// ... zeugs um die bildurls aus dem request body heraus zu parsen
foreach ($imageUrls as $imageUrl) {
$file = $client->setUri($imageUrl)->request()->getBody();
file_put_contents('irgendeinname', $file);
}
...
}
Daraus ergab sich dann folgender Workflow zur Abarbeitung der Queue:
$imageUrls = getImageUrls();
$jobQueue = new DT_Multiprocess_Job_Queue();
foreach ($imageUrls as $url) {
$jobQueue->addJob(new Job_ExampleImageFetching($url));
}
$jobQueue->work();
That’s it ![]()
Die Ausführungszeit des Konsolenskriptes hat sich drastisch reduziert und ich war glücklich.
Wer sich das genauer anschauen will, kann sich die Klassen auf GitHub anschauen. bald die Klassen auf GitHub anschauen ich versuche sie die Woche noch hoch zu pushen.
In dem Sinne viel Spaß beim forken und bis zum nächsten Mal.
Ähnliche Artikel
13 Kommentare Kommentar schreiben
Kommentar schreiben
Letzte Kommentare
- PHP 5.4 – die Neuerungen im Überblick at PHP, SEO, Software, Programmierung bei Kurzschreibweise für Arrays ab PHP 5.4
- Jevo bei Ajax mit dem ZF und jQuery – HOW TO
- Jevo bei Ajax mit dem ZF und jQuery – HOW TO
- Jan bei Ajax mit dem ZF und jQuery – HOW TO
- Wotan bei Ubuntu Panel zurücksetzen / wiederherstellen
von








Wäre es nicht evtl. sinnvoll ein optionales Limit auf den Queue setzen zu können, was die Anzahl der Forks limitiert? Dann hätte man die Ressourcennutzung unter Kontrolle.
[...] This post was mentioned on Twitter by Nico Hofmann, dev.Talk. dev.Talk said: Neuer Artikel: Multiprocessing mit einer simplen Job Queue und PCNTL http://www.dev-talk.info/?p=682 #dev_talk [...]
1) Wäre es nicht sinnvoller die execute()-Methode der Job-Klasse abstrakt zu machen? Dann ist geklärt, dass die konkrete Klasse diese Methode implementieren muss. Aktuell hätte man mit class Foo extends DT_Multiprocess_Job_Abstract {} eine gültige Klasse. Macht keinen Sinn, da spätestens bei der Bearbeitung, durch die Queue, eine Exception geworfen würde.
2) Wieso verwendest du die Job-ID als Exit-Code? Der Exit-Code hat ja schon eine definierte Bedeutung. Daher würde ich vorschlagen die execute()-Methode einen Exit-Code zurückliefern zu lassen, der dann fürs exit() genutzt wird.
3) Ich kenne mich mit PCNTL nun nicht besonders aus, aber irgendwie kommt mir die while-Schleife in der work()-Methode der Queue merkwürdig vor. Du nutzt die Rückgabe von pcntl_wxitstatus() als Index fürs Jobs-Array? Das kommt mir falsch vor.
Achso, noch was
Die Aussage, dass sich die Ausführzeit des Skripts “dramatisch” reduziert hat, hört sich eher schlecht als gut an. Ich behaupte mal, dass da “drastisch” besser passen würde
Hallo ihr beiden,
vielen Dank für eure Kommentare. Ich werde versuchen sie konstruktiv umzusetzen
In deinem konkreten Fall, hättest du übrigens auch curl_multi_* verwenden können. Hätte den Vorteil, dass es auch unter Windows läuft
Da das Zend Framework eh verwendet wird: ZendX_Console_Process_Unix!
http://framework.zend.com/manual/de/zendx.console.process.unix.overview.html
Das ist kein Job-Queue, sondern eine Prozessschleuder. Prozesse sind relativ “teuer”, deswegen wäre es vllt sinnvoller einen richtigen Job-Queue zu schreiben. In seiner einfachsten Form könnte man den eigentlichen Queue persistent machen (z.B. mit Memcache, Redis oder SQL-Datenbank) und dann eine bestimmte Anzahl von Workern spawnen, die nur eine einzige Aufgabe haben: Den Queue abzuarbeiten. Um verschiedene Jobtypen zu unterstützen und/oder andere Zusatzinfos unterzubringen, müsste man die Datenstruktur des Queue-Elements einen Makro-artigen Charaketer verleihen.
Habe letztens einen solchen Queue mit Redis und Python implementiert – hat super funktioniert.
Tach zusammen,
danke für das Feedback wir wissen das wirklich zu schätzen
Ich habe versucht einige Vorschläge mit umzusetzen. Die Sourcen sind jetzt auf GitHub verfügbar, wer also Lust hat die ganze Geschichte weiter zu verfolgen darf gerne mal auf http://github.com/dev-talk/DT vorbei schauen.
Als Job-Queue würde ich ansonsten noch Gearman vorschlagen – der kümmert sich um das ganze Verwaltungszeugs, die Implementierung von Auftraggebern und Workern geschieht aber in einer beliebigen Sprache (z.B. PHP), in der man das Gearman-Protokoll sprechen kann (PEAR Net::Gearman bzw. PECL Gearman).
Während Kind-Prozesse zur forken eine sehr interessante Technik ist, würde ich deine konkrete Problemstellung mit einem HttpRequestPool lösen. Einfacher und schneller (da keine prozesse erzeugt werden müssen). Nachteil: pecl http wird benötigt.
Oder aber du schaust dich nach einer “echten” Programmiersprache um die Multithreading kann. Oder mit events arbeitet…
snip —
In deinem konkreten Fall, hättest du übrigens auch curl_multi_* verwenden können. Hätte den Vorteil, dass es auch unter Windows läuft
– snap
So löse ich das auch immer. So hast du auch die Möglichkeit, deine Last auf mehrere APP-Server zu verteilen und entsprechend zu skalieren.
Grüße aus Bamberg
Danke!