队列
支持 文本、redis、msg-queue , 默认为文本
为工厂模式,可以任意扩展
<?php
/**
* 队列 管理者类
*
* 负责初始化并存放所有的队列类。
*
* @category Framework
* @package 队列
* @author Xinze <xinze@live.cn>
* @copyright Copyright (c) 2010, 黄新泽
* @version 1.0
* @todo
*/
class Zero_Queue
{
public static $keyPre = '_msg_|';
public static $type = 'file';
private static $fileQueueInstances = array();
static public function getKey($queue)
{
if ('WIN' == substr(PHP_OS, 0, 3) && 'msg-queue' == self::$type)
{
self::$type = 'file';
}
else
{
}
self::$keyPre = Zero_Registry::get('queue_key_prefix');
$queue = self::$keyPre . $queue;
if ('file' == self::$type && !isset(self::$fileQueueInstances[$queue]))
{
$queue_dir = ROOT_PATH . DS . 'shop' . DS . 'data' . DS . DATA_ID . DS . 'file-queue';
$FileQueue = new Zero\Queue\File(array(
'role' => 'customer',
'queueNamespace' => 'demo',
'queueDir' => $queue_dir,
'queueFileName' => $queue
));
self::$fileQueueInstances[$queue] = $FileQueue;
}
return $queue;
}
static public function send($queue, $data)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$res = Zero_Queue_Redis::send($queue, $data);
}
elseif ('msg-queue' == self::$type)
{
$res = Zero_Queue_MsgQueue::send($queue, $data);
}
else
{
$res = self::$fileQueueInstances[$queue]->push($data);
}
return $res;
}
static public function receive($queue)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$data = Zero_Queue_Redis::receive($queue);
}
elseif ('msg-queue' == self::$type)
{
$data = Zero_Queue_MsgQueue::receive($queue);
}
else
{
$data = self::$fileQueueInstances[$queue]->pop();
}
return $data;
}
//不会改动内容
public static function all($queue)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$data = Zero_Queue_Redis::all($queue);
}
elseif ('msg-queue' == self::$type)
{
$data = Zero_Queue_MsgQueue::all($queue);
}
else
{
$rs = self::$fileQueueInstances[$queue]->all();
}
return $data;
}
public static function remove($queue)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$rs = Zero_Queue_Redis::remove($queue);
}
elseif ('msg-queue' == self::$type)
{
$rs = Zero_Queue_MsgQueue::remove($queue);
}
else
{
$rs = self::$fileQueueInstances[$queue]->unmount();
}
return $rs;
}
public static function msgStat($queue)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$queue_status['msg_qnum'] = Zero_Queue_Redis::size($queue);
}
elseif ('msg-queue' == self::$type)
{
$queue_status = Zero_Queue_MsgQueue::msgStat($queue);
}
else
{
}
return $queue_status;
}
public static function msgStatQueueNum($queue)
{
$queue = self::getKey($queue);
if ('redis' == self::$type)
{
$queue_status['msg_qnum'] = Zero_Queue_Redis::size($queue);
}
elseif ('msg-queue' == self::$type)
{
$queue_status = self::msgStat($queue);
}
else
{
$pos_row = self::$fileQueueInstances[$queue]->position();
$all_num = self::$fileQueueInstances[$queue]->length();
$pos_num = $pos_row[1];
$queue_num = 0;
if ($pos_num > $all_num)
{
}
else
{
$queue_num = $all_num - $pos_num + 1;
}
$queue_status['msg_qnum'] = $queue_num;
}
return $queue_status['msg_qnum'];
}
private function __construct()
{
}
}
?>
读取队列数据
$times = 100000;
while ($times > 0 && Zero_Queue::msgStatQueueNum('log') > 0)
{
$queue_row = Zero_Queue::receive('log');
//todo sth
}
写入队列数据
//进入异步队列
Zero_Queue::send('notice_msg', 'data');
文档更新时间: 2022-09-29 18:24 作者:随商信息技术(上海)有限公司