Monday, November 22, 2004

Threading in an Windows environment

Motivation



Every now and then someone drops off a question and zend.com and the likes, asking is multi-threading in a Windows environment would ever be possible. Usually the answer that poor person is getting sounds something like:



"PHP is good for the web and multi-threading is not for the web, since the web is usually about synchronous user dialogs"



In my opinion this is first of all not true: The Web is not just about synchronous user dialogs. It's about asynchronous user dialogs as well, although addmittedly they are not handeled very well by the request-response paradigm which is inheritly synchronous. This is stressed by the fact that no browser I know implements HTTP-Server capabilities and therefore simulating asnychronous processes usually involves JavaScript triggered polling, via page/frame/iframe-reloading or remote scripting.



Why be asynchronous if all you use it for is a progress meter? As far as I (and I guess the majority of end-users) see it. An asynchronous procress is something you trigger and that you them may leave alone on it's own. Eventually you are going to collect a status or some pop-up will show up. Besides the latter case (which would involve polling by the client or a bi-directional connection), asynchronous processes may very well be triggered on the web. And especially when you want your application to process some really long running tasks you will need that functionality.



Currently people tend to tell you, that can execute an external binary or dispatch a request to another daemon - and those things should be written in a stable more daemon-oriented language than PHP, I am saying that for some purposes PHP is well enough sufficient for daemon-type jobs.



If for instance you are writing a newsletter-software which should be capable not only of maintaining newsletters, recipient-lists and such in a database and make those thing accessible via a pretty frontend, but also want to use PHP to do the ground work and send off those emails you will probably have a hard time. Possibly you want to be able to send more than one newletter at a time - so there already you have parallel processing.



Under *nix systems you will probably be quite familiar with pcntl_fork() and all that is around that. "Advanced PHP Programming" by George Schlossnagle - which I like a lot - has a whole chapter about writing daemons in an *nix environment. But what if you'd like to do the same under windows? Well it's ain't quite easy but it can be done - and this is how:



Solution - Overview



The basic idea is about spawning new processes via the command line interface of PHP and keeping communications via a shared buffer (either shared memory or even file based). You will also have to have some interrupt mechanism on the spawned processes to become able to control the process from the outside. While there might be some trap doors along the way, you can still do all you need, even in PHP4 (4.3.0+ if you use debug_backtrace() along the way).



Of course we want to have a nicely playing object oriented way of dealing with the issue, so let's have a abstract class for that. Instances of this class will be used on the parental process to control a spawned thread as well as bear the implementation of the thread as well. In the end we would like to use our thread something Java-like, like this:



// instanciating an implemented WinThread proxy

$thread = &new WinThreadImpl("arg1","arg2","arg3");



// starts the thread

$thread->start(WinThread_ASTHREAD);



// we may set the thread to sleep

$thread->suspend();



// and resume it again

$thread->resume();



// if we don't stop a thread which

// was not started as a daemon explicitly

// the parent script will eventually wait

// for it's spawn to exit before it can

// exit itself

$thread->stop();



Or maybe we would like to create a daemonized thread which can outlive it's parent and is therefore well suited for asynchronous processing. While all of the above control mechanisms work perfectly fine also for this fellow, I would like to introduce some additional ones (which would also work in both cases).



// instanciating an implemented WinThread proxy

$thread = &new WinThreadImpl("arg1","arg2","arg3");



// starts the thread as a daemon

$thread->start(WinThread_ASDAEMON);



// getting the pid of the process

echo $thread->getPid()."\n";



// usually the parent would exit if a daemonized

// thread is not explicitly collected

while($thread->collect()) {

echo "Thread is still running...\n";

sleep(1);

}



Solution - Detail



So let's look under the hood of that beast. In the first example naturally the constructor is called first.



/*<WinThread>*/ function &WinThread() {

$this->_parameters = $this->getConstructorParameters();

if($this->iAmTheParent()) {

$this->initializeSharedContext($this->generateRandomId());

}

}



First the constructor finds out which parameters were passed to it. This is simply a convenience function, which is implemented based on debug_backtrace().

The iAmTheParent() method simply checks based on environment information if it is currently running as the proxy/parent (as it is the case here) or as the thread/child process. In the prior case some shared context buffer is initialized based on a now randomly generated unique identifier. This context will later be used to enable communications between parent and child.



Next the thread is started (in thread-mode; meaning that the parental process waits for the child to exit at the end of it's processing time). The implementation of start() looks like this



>/*<void>*/ function start(/*<bool>*/ $asDaemon = FALSE) {

if($this->iAmTheParent()) {

//building command parameters

$classPath = $this->getClassPath();

$constructorParams = base64_encode(serialize($this->_parameters));

$launchScript = realpath(__FILE__);

$execCommand = WinThread::getInterpreterPath().

" \"".$launchScript."\"".

" \"launchAsThread\"".

" \"".$this->_id."\"".

" \"".$classPath."\"".

" \"".$constructorParams."\"".

" \"".(int)$asDaemon."\"";

//prefix command if the thread needs to run as a daemon

if($asDaemon) {

$execCommand = "start /B ".$execCommand;

}

//starting the external process

$this->_process = @popen($command,"r");

if($this->_process) {

//hold as long as the sub process has indicated that it

//is running by creating one byte of output

fread($this->_process, 1);

}

}

}



First of all the current classpath is fetched. This may be the absolute path to the current class or some other addressing mechanism. Just remember that it is the address of WinThreadImpl not WinThread.

Also the parameters of the constructor which will later be needed in the child process need to be fetched, serialized and packaged so they can be passed along with the command line.

The script to launch is actually the current file (WinThread). As this should encapsulate all technical core necessary for the thread to run. This will actually some executable code aside of the class implementation, which will conditionally be run. It's actually just necessary to have a well known, absolute address which the command line may take as an argument for the callable script that executes the thread.

The $executionCommand, which is now built needs to contain all the information necessary to run the thread. The path to the interpreter can be derived from environment information most of the times. Although this may in fact be a little bit tricky, this is beyond the scope of this article.

The only parameter this script takes which needs to be explained is the literal launchAsThread. This will in the child process be used as an indicator that this is in fact the child process and is the base for iAmTheParent() and iAmTheThread() type methods.

If a process is to be run as a daemon, and thereby be detached from the lifecycle of it's parent, the DOS command start /B may be used to prefix any existing command. start would open a new command-line instance, while the option /B would make it run in the background rather than having a new window popping open.

Finally the command is triggered using the simple popen() instruction, set to read input. This pipe will be used to stop the parent process until the thread has passed it's initialization phase.



Now that we have started the thread from a parent perspective we still need to know how all of this works from the child process perspective.



declare(ticks=2);



class WinThread {



[...]



/*static*/ /*<void>*/ function launch(/*<array*/ $argv) {

if($argv[1] == "launchAsThread") {

//get command line parameters

$id = $argv[2];

$classPath = $argv[3];

$parameters = unserialize(base64_decode($argv[5]));

$isDaemon = (bool)$argv[6];

$className = WinThread::getClassFromClassPath($classPath);

//importing/including the required class(es)

Framework::imports($classPath);

//instanciate the thread

$thread = &new $class($parameters[0],

$parameters[1],

$parameters[2],

$parameters[3],

$parameters[4],

$parameters[5],

$parameters[6],

$parameters[7],

$parameters[8],

$parameters[9]);

//initialize the shared context

$thread->initializeSharedContext($id);

//publish state to shared context

$thread->publishIsDaemon();

$thread->publishPid(getmypid());

$thread->publishIsRunning();

//enable the interrupt method for this process

$thread->enableInterrupt();

//echo one byte so that the main process knows that it's thread has started

echo "1\n";

//call the thread implementation callback method

$thread->run();

//stopping the thread automatically after it has been processed

$thread->stop();

}

}

}



WinThread::launch($argv);



First we need to declare(ticks=2) so we are later able to plug in interrupt methods using register_backtick_function(). So now the first argument after the script name is launchAsThread is enough indication that the thread should be started. First all parameters that were dropped into the command line call are extracted and correctly interpreted.



[to be continued]



What to bear in mind





  • This is simplified demonstration code. The real thing works quite nice I can assure you, however there are some details not shown in the presented implmenetation


  • I am using a programming library that I am putting together now for about 3 years. It lives on a one-class-per-file paradigma. At it's core it has a Java-like mechanism to import classes as prerequisites to other classes. All methods on the Framework-class pretty much state what they do. How the Framework works and how this helps the implementation of design patterns will be discussed in another article


  • This implementation assumes the existence of a context in the application scope for each thread running. How to implement such a context will be issue of another article. There are basically two efficient variants to do this.



    • Shared memory: Using shared memory under windows is currently a tricky beast since you may easily run into the memory-leak trap. However it's easier on you CPU-cycles.


    • Files: Using files you have to make sure that you do not corrupt them by concurrent write access or create inconsistent reads. How to easily create concurrency-resistent ressources will be topic of another article.




  • The usleep() function does not work correctly under Windows until PHP5. And not having it will be a critical hit in performance of other applications since CPU will be used to it's max. There is a workaround using fsockopen() on php.net for windows. But this heavily leaks memory if you try to use it in threads.


  • Remember that you are actually doing multi-processing here not multi-threading. Sinch each process is spawned completely self-contained, it will consume just as much RAM as it's parental process. This is due to the fact that the PHP runtime and libraries are yet again instanciated for each process.




Reference Coding



Up for now is the reference implementation, that currently exists in my application framework. Don't get confused. Yes, I lean a little towards Java-style syntax.



<?php



//*** begin: launch script *********************************************************************

if($argv[1] == "launch") {

require_once($argv[2]); //includes the framework if this is run as a stand-alone

declare(ticks=2);

}

//*** end: launch script ***********************************************************************



Framework::imports("net.developaz.DAPI");

Framework::imports("net.developaz.context.ApplicationContext");

Framework::uses("php","4.3.0");



declare("WinThread_ASDAEMON",TRUE);

declare("WinThread_ASTHREAD",TRUE);



/**

@version: 0.1.5



This abstract class implements 'threads' for windows systems via

multiple process spawned on the CLI.

*/

/*abstract*/ class WinThread extends DAPI {



//*** ATTRIBUTES ****************************************************************************

/*private*/ /*<ref>*/ var $_process;

/*private*/ /*<array>*/ var $_parameters = array();

/*private*/ /*<bool>*/ var $_iamathread = false; //indicator for thread vs. environment

/*private*/ /*<ApplicationContext>*/ var $_context;



//*** CONSTRUCTOR ***************************************************************************

/**

The constructor of this thread may be called to construct the

*/

/*<WinThread>*/ function &WinThread() {

//$this->_parameters = func_get_args();

$arr = debug_backtrace();

$this->_parameters = ($arr[0]['args'])?$arr[0]['args']:$arr[1]['args'];

if(!$_ENV['thread']) {

$this->setId(md5(uniqid(rand(), true)));

} else {

$this->setAsThread();

$this->debug("starting as a thread");

}

}



//*** ABSTRACT METHODS **********************************************************************

/**

Abstract method that has to be extended and implements the thread's business logic

*/

/*abstract*/ /*<void>*/ function run() { }



//*** PRIVATE METHODS ***********************************************************************

/**

Sets a flag that indicates that this is the real instance of the thread and not the stub

*/

/*private*/ /*<void>*/ function setAsThread() {

$this->_iamathread = true;

}



/**

Sets the internal ID of the process which is primarily used to identify the correct

context which is used to access data

*/

/*private*/ /*<void>*/ function setId(/*<string>*/ $id) {

$this->_id = $id;

$contextName = "Process.".ucfirst(get_class($this))."Context.".$this->_id;

$this->_context = &ApplicationContext::instance($contextName);

}



/**

Publishes the PID of the process

*/

/*private*/ /*<void>*/ function publishPid(/*<int>*/ $pid) {

$this->debug("setting pid: $pid");

$this->_context->set("pid",$pid);

}



/**

Publishes the info that the process is done

*/

/*private*/ /*<void>*/ function publishIsDone() {

$this->_context->set("__isStarted",0);

$this->_context->set("__isRunning",0);

$this->debug("this thread is done");

}



/**

Publishes the info that the process is running

*/

/*private*/ /*<void>*/ function publishIsRunning() {

$this->debug("this thread is running");

$this->_context->set("__isStarted",1);

$this->_context->set("__isRunning",1);

}



/**

Publishes the info that the process is suspended

*/

/*private*/ /*<void>*/ function publishIsSuspended() {

$this->debug("this thread is suspended");

$this->_context->set("__isRunning",0);

}



/**

Checks for external instructions

*/

/*private*/ /*<void>*/ function checkForInstructions() {

do { //this loop will only be executed multiply if the thread is suspended

$iterate = false;

$this->wait(20); //this reduces the instruction rate and actually increases performance

$instruction = $this->getInstruction();

$this->debug("checking for instruction... $instruction");

if($instruction && $instruction != "null") {

switch($instruction) {

case "doExit":

$iterate = false;

$this->stop();

break;

case "doSuspend":

$params = $this->getInstructionParameters();

$maxMsecs = array_shift($params);

$this->publishIsSuspended();

$iterate = true;

break;

case "doResume":

$this->publishIsRunning();

$iterate = false;

break;

default:

break;

}

$this->clearInstruction();

}

} while($iterate);

}



/**

Gets the last instruction for a thread

*/

/*private*/ /*<string>*/ function getInstruction() {

return @$this->_context->get("__instruction");

}



/**

Gets the parameters of an instruction

*/

/*private*/ /*<string>*/ function getInstructionParameters() {

return @$this->_context->get("__instructionParams");

}



/**

Assigns an instruction for the thread

*/

/*private*/ /*<void>*/ function setInstruction(/*<string>*/ $instruction, /*<array>*/ $parameters = array()) {

$this->_context->set("__instruction",$instruction);

$this->_context->set("__instructionParams",$parameters);

}



/**

Clears the instruction

*/

/*private*/ /*<void>*/ function clearInstruction() {

$this->_context->set("__instruction","null");

$this->_context->set("__instructionParams",array());

}



/**

Sleeps microseconds

*/

/*incomplete/private*/ /*<void>*/ function wait(/*<int>*/ $msecs) {

return;

//$fp = @fsockopen("udp://localhost",31238,$errno,$errstr,$msecs/1000);

//if($fp) fclose($fp);

}



/**

Returns the microtime as a float

*/

/*private*/ /*<float>*/ function microtime_float() {

list($usec, $sec) = explode(" ", microtime());

return ((float)$usec + (float)$sec);

}



/**

Enables the listener

*/

/*private*/ /*<void>*/ function enableListener() {

if($this->isThread() && !$this->_listenerEnabled) {

register_tick_function(array(&$this,"checkForInstructions"),true);

$this->_listenerEnabled = true;

}

}



/**

Disables the listener

*/

/*private*/ /*<void>*/ function disableListener() {

if($this->isThread() && $this->_listenerEnabled) {

@unregister_tick_function(array(&$this,"checkForInstructions"));

$this->_listenerEnabled = false;

}

}



//*** PUBLIC METHODS ************************************************************************

/**

From the host this method starts the thread. If a process

is started as a daemon it is detached from the parent process

*/

/*<void>*/ function start(/*<bool>*/ $asDaemon = false) {

if(!$this->isThread()) {

$class = get_class($this);

$classes = Framework::get("classPaths");

$classPath = Framework::path2class($classes[$class]);

$params = base64_encode(serialize($this->_parameters));

$launchScript = realpath(__FILE__);

$framework = str_replace("//","/",Framework::get("root")."/Framework.class.php");

$command = WinThread::getInterpreterPath()." \"".$launchScript."\" \"launch\" \"".$framework."\" \"".$this->_id."\" \"".$classPath."\" \"".$params."\"";

//$this->debug($command);

if($asDaemon) {

$this->_iamadaemon = true;

$command = "start /B ".$command." \"1\"";

}

$this->_process = @popen($command,"r");

if($this->_process) {

fread($this->_process, 1); //hold as long as the sub process has indicated that is is running

} else {

$this->setError(2,"could not spawn process");

}

}

}



/**

Stops the process

*/

/*<void>*/ function stop() {

if($this->isThread()) {

$this->disableListener();

$this->debug("stopping itself");

$this->publishIsDone();

exit(0);

} else {

$this->debug("stopping from outside");

$this->setInstruction("doExit");

}

}



/**

Gets the PID of the process

*/

/*<int>*/ function getPid() {

if($this->isThread()) {

return getmypid();

} else {

return $this->_context->get("pid");

}

}



/**

Checks if this instance of the class is the thread itself or the stub in the host process

*/

/*<bool>*/ function isThread() {

return (bool)$this->_iamathread;

}



/**

Checks if this instance of the a daemon

*/

/*<bool>*/ function isDaemon() {

return (bool)$this->_iamadaemon;

}



/**

Checks if a process is done

*/

/*<bool>*/ function isDone() {

$started = $this->_context->get("__isStarted");

return (bool)!$started;

}



/**

The collecting process waits for a specific sub-process to end

*/

/*<bool>*/ function collect() {

if(!$this->isThread()) {

pclose($this->_process);

return true;

} else {

$this->setError(2,"A thread cannot wait for itself to finish");

}

}



/**

This method suspends a thread

*/

/*<void>*/ function suspend() {

if(!$this->isThread()) {

$this->setInstruction("doSuspend");

}

}



/**

This method resumes a thread, that has been suspended

*/

/*<void>*/ function resume() {

if(!$this->isThread()) {

$this->setInstruction("doResume");

} else {

$this->setError(2,"This thread is already running");

}

}



/**

Checks if the current thread is suspend

*/

/*<bool>*/ function isSuspended() {

return (bool)!$this->_context->get("__isRunning");

}



/**

Checks if the current thread is running

*/

/*<bool>*/ function isRunning() {

return (bool)$this->_context->get("__isRunning");

}



/**

Sets a parameter in the context of the thread

*/

/*<void>*/ function setParameter(/*<string>*/ $name , /*<mixed>*/ $value) {

$this->disableListener();

$this->_context->set($name,$value);

$this->enableListener();

}



/**

Gets a parameter in the context of the thread

*/

/*<void>*/ function getParameter(/*<string>*/ $name) {

$this->disableListener();

$res = $this->_context->get($name);

$this->enableListener();

return $res;

}



//*** STATIC METHODS ************************************************************************

/**

Sets the path to the PHP-cli interpreter

*/

/*static*/ /*<void>*/ function setInterpreterPath(/*<string>*/ $path) {

$GLOBALS['WINTHREAD_STATIC_VARS']['interpreter'] = $path;

}



/**

Returns the path to the PHP-cli interpreter

*/

/*static*/ /*<string>*/ function getInterpreterPath() {

$interpreter =&$GLOBALS['WINTHREAD_STATIC_VARS']['interpreter'];

if(!$interpreter) {

$php_ini = realpath(get_cfg_var('cfg_file_path'));

$php_dir = dirname($php_ini);

if(is_file($php_dir."/php-cli.exe")) $interpreter = $php_dir."/php-cli.exe";

elseif(is_file($php_dir."/cli/php.exe")) $interpreter = $php_dir."/cli/php.exe";

$interpreter .= " -c \"".$php_ini."\"";

}

return $interpreter;

}



/**

Starts a thread when this class is called as a script from the interpreter

*/

/*static*/ /*<void>*/ function launch(/*<array*/ $argv) {

if($argv[1] == "launch") {

//DAPI::startLogging("log.txt");

$id = $argv[3];

$classPath = $argv[4];

$_ENV['thread'] = true;

$parameters = unserialize(base64_decode($argv[5]));

$isDaemon = (bool)$argv[5];

$class = substr($classPath,strrpos($classPath,".")+1,strlen($classPath));

Framework::imports($classPath);

$thread = &new $class($parameters[0],

$parameters[1],

$parameters[2],

$parameters[3],

$parameters[4],

$parameters[5],

$parameters[6],

$parameters[7],

$parameters[8],

$parameters[9]);

$thread->_iamadaemon = $isDaemon;

$thread->setId($id);

$thread->publishPid(getmypid());

$thread->publishIsRunning();

$thread->enableListener();

echo "1\n"; //so that the main process knows that this process has started

$thread->run();

$thread->stop();

}

}



}



//*** begin: launch script *********************************************************************

WinThread::launch($argv);

//*** end: launch script ***********************************************************************



?>

0 comments: