Create a Queue based Parallel Task Processing Program with Supervisor - Comos/qpm GitHub Wiki
Think about a scenario. There's a URL list in a queue. A daemon fetch items one by one from the queue, then dispatches those URLs as tasks to child processes. They crawl URLs and save results to files. For promoting efficiency, the children work parallelly. But there's still an upper limit of concurrency to keep servers from overloading.
Let’s try to implement the scenario with Supervisor::taskFactoryMode().
For the demonstration purpose, we can use a text file to simulate the queue. see spider_task_factory_data.txt
http://news.sina.com.cn/
http://news.ifeng.com/
http://news.163.com/
http://news.sohu.com/
http://ent.sina.com.cn/
http://ent.ifeng.com/
…
END
First, we need a SpiderTaskFactory. The factory method fetchTask() reads each line of text file and returns instance of Comos\Qpm\Process\Runnable. As it reaches an END or end of the file, a StopSignal is thrown, it causes the whole program exits.
The factory looks like following.
class SpiderTaskFactory {
private $_fh;
public function __construct($input) {
$this->_input = $input;
$this->_fh = fopen($input, 'r');
if ($this->_fh === false) {
throw new Exception('fopen failed:'.$input);
}
}
public function fetchTask() {
while (true) {
if (feof($this->_fh)) {
throw new Comos\Qpm\supervisor\StopSignal();
}
$line = trim(fgets($this->_fh));
if ($line == 'END') {
throw new Comos\Qpm\supervisor\StopSignal();
}
if (empty($line)) {
continue;
}
break;
}
return new SpiderTask($line);
}
}
The Task looks like following.
class SpiderTask implements Comos\Qpm\Process\Runnable {
private $_target;
public function __construct($target) {
$this->_target = $target;
}
//The method runs in child process.
public function run() {
$r = @file_get_contents($this->_target);
if ($r===false) {
throw new Exception('fail to crawl url:'.$this->_target);
}
file_put_contents($this->getLocalFilename(), $r);
}
private function getLocalFilename() {
$filename = str_replace('/', '~', $this->_target);
$filename = str_replace(':', '_', $filename);
$filename = $filename.'-'.date('YmdHis');
return __DIR__.'/_spider/'.$filename.'.html';
}
}
The assembly process looks like following.
$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';
$spiderTaskFactory = new SpiderTaskFactory($input);
$config = [
//The factory method is $spiderTaskFactory->fetchTask()
'factory'=>[$spiderTaskFactory, 'fetchTask'],
//The max quantity of concurrency is 3.
'quantity' => 3,
];
//Launch...
Comos\Qpm\Supervision\Supervisor::taskFactoryMode($config)->start();
You can see the complete example at here. spider_task_factory.php