站長資訊網(wǎng)
        最全最豐富的資訊網(wǎng)站

        php中如何使用簡單可靠的rabbitmq組件包

        在項目中rabbitmq得到了廣泛的時候,這里對rabbitmq的常規(guī)功能做了一個簡單的總結,并封裝成了composer包,composer包地址、github地址,歡迎fork,由于水平有限,難免存在bug,歡迎提出寶貴意見。

        php中如何使用簡單可靠的rabbitmq組件包

        easy-rabbitmq 包簡介

        對php-amqplib/php-amqplib包的二次封裝,為常見功能提供一套開箱即用的生產(chǎn)解決方案。目前支持的功能列表如下:

        • 推送消息到直連交換機(含延遲消息)

        • 推送消息到扇形交換機(含延遲消息)

        • 推送消息到主題交換機(含延遲消息)

        • 訂閱模式下的可靠消費, 消費者消費失敗后將會嘗試繼續(xù)消費,最多嘗試5次。

        • 拉取模式下的可靠消費, 消費者消費失敗后將會嘗試繼續(xù)消費,最多嘗試5次。

        如果還有其它場景,歡迎繼續(xù)補充,隨后進行迭代?。?/p>

        要求

        安裝包對PHP版本對要求主要取決于php-amqplib/php-amqplib包本身對要求,這里為了兼顧php5.0的使用者,我們使用了php-amqplib/php-amqplib包V2.9.0的版本。

        具體的要求參照這里。

        不過筆者推薦使用php7.0及其以上版本, 這個開發(fā)包也是在7.0這個版本上面開發(fā)完成的!

        安裝

              composer require maweibinguo/easyrabbitmq

        使用

        在這里我們推薦php腳本+supervisor結合使用,用以保證消費進程的可靠性、增強worker的消費能力! 如果你還沒有聽說過supervisor,可以點擊這里了解.

        1、推送消息

        1-1、推送消息到直連交換機

              $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會到達指定的交換機       $instance->pushToDirect(                         $msg = time(), //消息體內容                         $exchange = "easy_direct_exchange", //交換機名稱                         $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致                         $delaySec = 30 //延遲秒數(shù)       );        //無延遲,推入到指定到直鏈交換機       $instance->pushToDirect(                         $msg = time(), //消息體內容                         $exchange = "easy_direct_exchange", //交換機名稱                         $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致       );

        1-2、推送消息到扇形交換機

              $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會到達指定的交換機       $instance->pushToFanout(                         $msg = time(), //消息體內容                         $exchange = "easy_fanout_exchange", //交換機名稱                         $delaySec = 30 //延遲秒數(shù)       );        //無延遲,推入到指定到直鏈交換機       $instance->pushToFanout(                         $msg = time(), //消息體內容                         $exchange = "easy_fanout_exchange" //交換機名稱       );

        1-3、推送消息到主題交換機

              $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);              //延遲消息,30 秒中后才會到達指定的交換機       $instance->pushToTopic(                         $msg = time(), //消息體內容                         $exchange = "easy_topic_exchange", //交換機名稱                         /**                          * routingKey 要同consum(get)方法的bindingKey相匹配                          * bindingKey支持兩種特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一個單詞、“#”用于匹配多個單詞(也可以是0個)                          * 無論是bindingKey還是routingKey, 被"."分隔開的每一段獨立的字符串就是一個單詞, easy.topic.queue, 包含三個單詞easy、topic、queue                          */                         $routingKey = "easy.topic.queue",                         $delaySec = 30 //延遲秒數(shù)       );        //無延遲,推入到指定到直鏈交換機       $instance->pushToTopic(                         $msg = time(), //消息體內容                         $exchange = "easy_topic_exchange", //交換機名稱                         $routingKey = "easy.topic.queue"           );

        2、消費消息

        消費支持自動重試,最多嘗試重試5次,每次消費失敗后該消息將會被重新投入到消費隊列中。重新的時間將會隨著失敗的次數(shù)增多逐漸推移,本客戶端支持的推移策略如下:

        失敗1次(1秒鐘后會再被投遞), 失敗2次(2秒鐘后會再被投遞), 失敗3次(4秒鐘后會再被投遞), 失敗4次(8秒鐘后會再被投遞), 失敗5次(16秒鐘后會再被投遞)

        2-1、訂閱模式

        訂閱模式下的可靠消費
              $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);       $instance->consume(             $queueName = "direct_test_queue",//訂閱的隊列名稱             $consumerTag = "c1",//消費標記             $exchange = "easy_direct_exchange",//交換機名稱             $bindingKey = "direct_test_queue",//bindingkey,如果是直鏈交換機需要同routingKey保持一致             $callback = function($msg){                 $body = $msg->body;                 file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND);                 //如果返回結果不絕對等于(===)true,那么將觸發(fā)消息重試機制                 return false;             },             //5次消費消費失敗后,失敗消息將會投遞到的隊列名稱             $failedQueue = "easymq@failed"       );

        2-2、拉取模式

        拉取模式下的可靠消費
              $config = [           "host" => "127.0.0.1",             "port" => "5672",             "user" => "guest",             "password" => "guest",             "vhost" => "/",             "channel_max_num" => 10,       ];           $instance = RabbitMq::getInstance($config);       $instance->get(             $queue = "get_queue",             $exchange = "easy_fanout_exchange",             $bindingKey = "",             $callback = function($msg){                 $body = $msg->body;                 file_put_contents("./test.log", "time => " . time() . "t" . " body => " . $body . PHP_EOL , FILE_APPEND);                 //如果返回結果不絕對等于(===)true,那么將觸發(fā)消息重試機制                 return false;             },             //5次消費消費失敗后,失敗消息將會投遞到的隊列名稱             $failedQueue = 'easymq@failed'       );

        推薦學習:php視頻教程

        贊(0)
        分享到: 更多 (0)
        網(wǎng)站地圖   滬ICP備18035694號-2    滬公網(wǎng)安備31011702889846號
        主站蜘蛛池模板: 国产成人精品AA毛片| 99re久久精品国产首页2020| 亚洲av无码国产精品色午夜字幕 | 99久久精品费精品国产一区二区| 免费精品国产自产拍在线观看| 日本久久久精品中文字幕| 日韩精品真人荷官无码| 欧美日韩精品一区二区三区不卡| 青青青青久久精品国产| 国产精品区一区二区三在线播放| 亚洲精品无码永久在线观看你懂的| 久久久91人妻无码精品蜜桃HD| 成人免费精品网站在线观看影片| 久久精品草草草| 国产成人精品男人的天堂538| 欧洲成人午夜精品无码区久久| 亚洲国产精品成人| 欧美精品亚洲人成在线观看 | 日韩人妻无码精品无码中文字幕 | 久久se精品一区精品二区国产| 91麻豆精品一二三区在线| 99视频在线观看精品| 国产精品无码久久综合| 久久亚洲精品国产精品| 一本精品中文字幕在线| 亚洲国产精品综合久久一线| 久久久精品日本一区二区三区 | 午夜精品久久久久久| 免费人欧美日韩在线精品| 久久精品无码一区二区三区免费 | 国产欧美在线观看精品一区二区| 欧美精品亚洲精品日韩1818| 国产精品久久久久aaaa| 99re国产精品视频首页| 国产成人精品精品欧美| 精品无人区麻豆乱码1区2区| 久久精品一区二区国产| 欧美巨大黑人精品videos| 久久久久夜夜夜精品国产| 亚洲国产精品自在在线观看 | 伊人久久精品无码二区麻豆|