3 May 2007
Communicating with threads in PHP
Earlier this week, I suggested PHP could be multithreaded. The sample I provided was very simple and at least one reader quickly wondered how to communicate with threads.
If you haven’t already, take a look at part 1 to get some basic information about threads in PHP.
It took a bit longer to get that part working that simple threads, but as of now, I have a functional prototype of an HVAC thread.
HVAC.php
require "ThreadInstance.php";
class HVAC extends ThreadInstance {
var $currentTemp;
var $heaterOn;
var $acOn;
var $toggleOn;
function HVAC () {
$this->setup();
$this->currentTemp = 32;
$this->heaterOn = false;
$this->acOn = false;
$this->toggleOn = 70;
}
function runHeater() {
$this->heaterOn = true;
$this->acOn = false;
}
function runAc() {
$this->heaterOn = false;
$this->acOn = true;
}
function process() {
switch (true) {
case ($this->heaterOn):
return $this->processHeater();
case ($this->acOn):
return $this->processAc();
}
}
function processHeater() {
if ($this->currentTemp < $this->toggleOn) {
$this->currentTemp++;
}
}
function processAc() {
if ($this->currentTemp > $this->toggleOn) {
$this->currentTemp–;
}
}
function apploop($command) {
$this->process();
switch ($command) {
case “”:
// noop
return;
case “start heater”:
$this->runHeater();
$this->response (”ok”, NULL);
return;
case “start ac”:
$this->runAc();
$this->response(”ok”, NULL);
return;
case “get current temp”:
$this->response (”ok”, $this->currentTemp);
return;
case “set temp”:
$temp = intval($this->getLine(true));
if ($temp) {
$this->toggleOn = $temp;
$this->response (”ok”, NULL);
} else {
$this->response (”err”, “not a number”);
}
return;
case “quit”:
exit;
default:
$this->response (”err”, “bad request - $command”);
return;
}
}
}
$hvac = new HVAC();
do {
sleep (1);
$hvac->apploop($hvac->getCommand());
} while (true);
ThreadInstance.php
set_time_limit (0);
require "ThreadUtility.php";
class ThreadInstance {
var $stdin;
var $commandbuffer;
var $stdout;
function setup() {
$this->stdin = fopen ("php://stdin", "r");
$this->stderr = fopen ("php://stderr", "w");
stream_set_blocking ($this->stdin, false);
$this->commandbuffer = (array)NULL;
$this->outbuffer = "";
}
function getCommand() {
$command = fgets ($this->stdin, 1024);
$this->commandbuffer[] = $command;
$command = array_shift ($this->commandbuffer);
return trim($command);
}
function response ($status, $data) {
response ($status, $data);
}
function getLine ($wait = false) {
if ($wait) {
$buffer = "";
while (!strlen($buffer)) {
$buffer .= fgets ($this->stdin, 1024);
}
} else {
$buffer = fgets ($this->stdin, 1024);
}
return trim($buffer);
}
function debug ($text) {
fwrite ($this->stderr, $text);
}
}
ThreadUtility.php
function response ($status, $response) {
echo $status . "\n";
echo base64_encode(serialize($response)), "\n";
}
function processresponse ($string) {
$parts = explode ("\n", $string);
$status = $parts[0];
$data = unserialize (base64_decode ($parts[1]));
return array ("status" => $status, "data" => $data);
}
Thread.php
require "ThreadUtility.php";
class Thread {
var $pref ;
var $pipes;
var $pid;
var $stdout;
function Thread() {
$this->pref = 0;
$this->stdout = "";
$this->pipes = (array)NULL;
}
function Create ($url) {
$t = new Thread;
$descriptor = array (0 => array ("pipe", "r"), 1 => array ("pipe", "w"), 2 => array ("pipe", "w"));
$t->pref = proc_open ("php -q $url ", $descriptor, $t->pipes);
stream_set_blocking ($t->pipes[1], 0);
stream_set_blocking ($t->pipes[2], 0);
usleep (10);
return $t;
}
function isActive () {
$this->stdout .= $this->listen();
$f = stream_get_meta_data ($this->pipes[1]);
return !$f["eof"];
}
function close () {
$this->tell("quit");
$r = proc_close ($this->pref);
$this->pref = NULL;
return $r;
}
function tell ($thought) {
fwrite ($this->pipes[0], $thought . "\n");
$response = "";
do {
$response = $this->listen();
} while ($response == "");
return processresponse ($response);
}
function listen () {
$buffer = $this->stdout;
$this->stdout = "";
while ($r = fgets ($this->pipes[1], 1024)) {
$buffer .= $r;
}
return $buffer;
}
function getError () {
$buffer = "";
while ($r = fgets ($this->pipes[2], 1024)) {
$buffer .= $r;
}
return $buffer;
}
}
hvac.php
set_time_limit(0);
include "Thread.php";
$tHVAC = Thread::create("HVAC.php");
$r = $tHVAC->tell("start heater");
echo "Start heater: ", $r["status"], "\n";
if ($r["status"] == "err") {
$tHVAC->close();
exit;
}
$tHVAC->tell("set temp\n50");
$goingUp = true;
while ($tHVAC->isActive()) {
echo $tHVAC->getError();
$r = $tHVAC->tell("get current temp");
if ($r["status"] == "ok") {
echo "Current Temperature: ", $r["data"], "\n";
}
if ($r["data"] == 50 && $goingUp) {
$tHVAC->tell("set temp\n35");
$tHVAC->tell("start ac");
$goingUp = false;
} else if ($r["data"] == 35 && !$goingUp) {
echo "Main Thread donenWaiting for HVAC Thread to end... ";
$tHVAC->close();
echo "ok";
exit;
}
}
Yes, it’s a lot of code for a sample.
In the original example, I assumed a file would have a given task. If you needed to generate a report while running a lengthy query, generate.php and analyze.php would make good threads. But real life is not that simple. So what I’ve now done is made a REPL loop in HVAC.php (the thread). This loop is what facilitates communications. It constantly polls for commands.
To understand this, let’s first ignore Thread.php, ThreadUtility.php and ThreadInstance.php. hvac.php is the stub application and HVAC.php is the HVAC unit and what we want threaded. REPL functionality is easily accomplished:
$hvac = new HVAC();
do {
sleep (1);
$hvac->apploop($hvac->getCommand());
} while (true);
sleep(1) is not strictly neccessary. It’s there to prevent real-time continuous polling. An initial delay is useful, but it doesn’t really need to be in the loop nor need it be so long. apploop is the controller for this thread. getCommand retrieves the oldest unprocessed command (typically, the only command) for to be executed.
Inside of apploop, logic is carried out like normal. In place of a clean exit, though, commands return response. This is an array comprised of a status and data. Status has two established values, err and ok. Data is a field allowing whatever data is useful to be passed back. It can be an object, however, the class must be defined in both contexts for it to be usable.
On the stub side, the thread is set up the same way as before. The first difference is that calls to tell return results. You will quickly know whether an operation succeeded without entering a listen poll loop.
One quirky statement is $tHVAC->tell("set temp\n50"); Commands are only one line. The set temp command is defined to prompt for a temperature, and it happens that the command issuing stream is the stream on which the temperature is polled. Therefore, we prime the command stream with a second statement (50) before it’s polled. This is not an ideal way to operate, but at this time, there is no more formalized architecture.
Finally, there’s a loop to set the temperature. With the heater turned on, the temperature is set to 50º. Once 50º is reached, the air conditioner is switched on and the temperature is lowered to 35º. Then the application exits.
isActive is not as accurate under this model. Since the thread is in a continuous loop, executon never truly stops. isActive will only stop if some error has taken down the thread. Therefore, one of the commands this thread supports is quit. Also, the general thread class has been altered to dispatch a quit instruction before it pulls the plug on the process.
The main application will not exit until all spawned threads exit. Therefore, it’s important that you’ve set up some remote-controllable mechanism to end it.
Update: It seems Wordpress was mishandling the character sequence \\n — I’ve fixed now. If you’ve had problems with this code, try changing that. I’m also happy to try helping whoever offline. Just email me at php@alternateinterior.com
I’ve also found that computationally intense code doesn’t respond well as a communicating thread. For now, anything that well run for any significant amount of time, set it up as a single file and avoid using tell(). I’m working on it and hope to have a better solution by early next week.
Follow Up: Comparing linear execution with multithreaded execution, and bug fixes for long-lasted code. Part 3 is here!
5 Comments currently posted.
dash says:
Josh Strike says:
I’m dying to get this working so I can make multiple SOAP calls w/o using pcntl_fork …I tried a lot of ways…finally I’m just copy/pasting the code into files and all I end up with is a process that won’t quit. Some of this is a little over my head, but I’m not sure what I’m doing wrong. If I limit the process time I get “Start heater could n” …not sure what error is generating the “could”… I feel like I get what’s going on here, but not why the process never quits.
Mike Malone says:
As you said in the first part of this series, this is not multithreading, so why do you keep saying it is? What you’re really doing is forking multiple processes, but you’re doing it in an incredibly inefficient manner. PHP has built-in functions that allow you to work directly with the underlying system calls just as you would in C. I wrote a quick example that describes how to use these functions on my blog, check it out. You can communicate between processes using a pipe.
Moreover, for 99% of what you’re trying to accomplish here you could just use non-blocking I/O. If you’re doing something processor intensive, you won’t gain much by forking (unless you have multiple processors). If you’re waiting on I/O, non-blocking I/O is a much simpler solution.
Brian W. Bosh says:
Mike,
There are many reasons for the issues you raise.
First, pcntl_fork (or any of the process control functions) is not part of a default PHP build. Many users do not have the ability (or wish) to recompile PHP to include this functionality.
Forking does offer better preformance, and cleaner communication. It also requires more knowledge on the part of the developer to get it working. This solution allows developers to leverage what they already know. Threading is this “big mysterious thing” and even technically minded users benefit by starting simpler.
Finally, these are not threads. I did say that. I have not continually re-iterated that, as that is the functionality they’re providing, even if not the mechanism by which it’s provided. It’s much easier to understand by giving them a common name. This came about because in searching for information on how to multithread, there were no great resources, just forum posts saying its not possible but you can kinda fake it. Using the term “threads” lets people find the information they’re looking for without telling them to search for some obscure function.
AJAX is just an implementation of communication by XMLHttpRequests that have existed for many years. But it took a term that people could recognize to grow. These threads may just be spawned processes, but there are a lot more people who are interested in “threads” than “spawning processes”.
Brian
lucky says:
Your code is cool. But this is not the threads! Your “theads” does not have the definitive thread characteristic: the real theads shares data in the single address space. Processes lives in the separate address spaces and can not to share data by such way. Therefore spawned procesess can not to be a theads, not even emulate them. Be careful with terminology.


can u please explain this whole structure with a diagram, then it will be pretty easy to understand