PHP实现进程间通信:消息队列

实际上,在PHP开发过程中,对于消息队列的应用还是很广泛的。消息队列(message queue)也是Linux系统进程间通信的一种方式。 除了现有的一些消息队列解决方案以外,PHP对共享内存段的操作有两组函数:System V IPC和Shared Memory。 其中System V IPC由AT&T的贝尔实验室对早期的UNIX系统贡献而来,现在的linux系统都完美的继承了下来,该系列函数能够更方便的操作数据,无需像Shared Memory那样必须自己掌握读写时的偏移量、长度等,也不用序列化/反序列化来回转换(因为Shared Memory函数只支持字符串格式的数据参数)。但是System V IPC系列不支持Windows,所以如果要在win环境下使用,只能选Shared Memory。 PHP的System V msg模块是对Linux系统支持的System V IPC中的System V消息队列函数族的封装。我们需要利用sysvmsg模块提供的函数来进进程间通信。  

<?php 
$message_queue_key = ftok(__FILE__, 'a'); 
$message_queue = msg_get_queue($message_queue_key, 0666);
var_dump($message_queue);

$message_queue_status = msg_stat_queue($message_queue);
print_r($message_queue_status);

//向消息队列中写
msg_send($message_queue, 1, "Hello,World!");

$message_queue_status = msg_stat_queue($message_queue);
print_r($message_queue_status);

//从消息队列中读
msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT);
print_r($message."\r\n");

msg_remove_queue($message_queue); 
?>

由于System V IPC只支持linux类操作系统,以上演示请在linux中进行.

ftok函数将一个可访问的文件路径名转换为一个可供 shmop_open() 和其他系统VIPC keys使用的整数,proj参数必须是一个字符串,这个参数其实就是读写方式. msg_get_queue()会根据传入的键值返回一个消息队列的引用。如果linux系统中没有消息队列与键值对应,msg_get_queue()将会创建一个新的消息队列。函数的第二个参数需要传入一个int值,作为新创建的消息队列的权限值,默认为0666。这个权限值与linux命令chmod中使用的数值是同一个意思,因为在linux系统中一切皆是文件。 msg_remove_queue用于销毁一个队列。 下面是一个具体的应用场景:

<?php
 
$message_queue_key = ftok(__FILE__, 'a');
$message_queue = msg_get_queue($message_queue_key, 0666);
 
$pids = array();
for ($i = 0;  $i < 5; $i++) {
        //创建子进程
        $pids[$i] = pcntl_fork();
 
        if ($pids[$i]) {
                echo "No.$i child process was created, the pid is $pids[$i]\r\n";
                pcntl_wait($status);//非阻塞的线程等待,防止僵尸进程的出现
        } elseif ($pids[$i] == 0) {
                $pid = posix_getpid();
                echo "process.$pid is writing now\r\n";
 
                msg_send($message_queue, 1, "this is process.$pid's data\r\n");
                posix_kill($pid, SIGTERM);
        }
}
 
do {
        msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT); 
        echo $message;
        $a = msg_stat_queue($message_queue);
        if($a['msg_qnum'] == 0){
            break;
        }
} while(true)
 
?>

  运行结果:

No.0 child process was created, the pid is 2196
process.2196 is writing now
No.1 child process was created, the pid is 2197
process.2197 is writing now
No.2 child process was created, the pid is 2198
process.2198 is writing now
No.3 child process was created, the pid is 2199
process.2199 is writing now
No.4 child process was created, the pid is 2200
process.2200 is writing now
this is process.2196's data
this is process.2197's data
this is process.2198's data
this is process.2199's data
this is process.2200's data

  如果是验证父子进程间的通信,可以这样做:

<?php
$message_queue_key = ftok(__FILE__, 'a');
$message_queue = msg_get_queue($message_queue_key, 0666);
$pid = pcntl_fork();
if ($pid==-1) {
        die("cannot fork");
} else if ($pid) {
                pcntl_wait($status);
                msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT);
                echo $message;

} else {
    $pid = posix_getpid();

        msg_send($message_queue, 1, "this is process.$pid's data\r\n");
}
?>