站長資訊網
        最全最豐富的資訊網站

        RabbitMQ集群跨網段消息遷移

        需求背景

        將阿里云同一個VPC下的RabbitMQ集群的消息從一個網段集群遷移到另一個網段集群。消息中間件的消息是即時消費,為何還有歷史消息,因為是歷史遺留問題。故要遷移

        整個網絡拓撲圖如下

        注意:

        若對于跨VPC網絡

        1. 確保各主機網絡互通

        2. 配置好各主機名

        兩邊安全組出方向開發:15672、25672、5672、4369端口

        否在在加入集群會出現問題

        RabbitMQ集群跨網段消息遷移

        資源清單

        主機名

        IP地址

        角色

        備注

        node171

        172.20.0.171

        老的MQ集群_1

         

        node172

        172.20.0.172

        老的MQ集群_2

         

        node173

        192.168.0.173

        MQ集群_1

         

        node174

        192.168.0.174

        新的MQ集群_2

         

        基礎軟件及環境信息

        操作系統:CentOS Linux release 7.3.1611

        Erlang:Erlang/OTP 20 [erts-9.3.3.3]

        RabbitMQ:rabbitmq_server-3.7.8

        集群的部署

        node171、node172組成集群A

        node173、node174組成集群B

        這里的環境部署略

        創建測試賬戶

        在【node171上進行操作】

        rabbitmqctl add_user root root123

        rabbitmqctl add_vhost kcvhost

        rabbitmqctl set_permissions -p kcvhost root  “.*” “.*” “.*”

        rabbitmqctl add_user admin admin123

        rabbitmqctl set_permissions -p kcvhost admin  “.*” “.*” “.*”

        rabbitmqctl set_user_tags admin administrator

        rabbitmq-plugins enable rabbitmq_management

        rabbitmqctl stop_app

        rabbitmqctl start_app

        生成測試數據

        消息生產者代碼:

        package com.zjkj.rabbitmq.demo;
         
        import Java.io.IOException;
        import java.util.concurrent.TimeoutException;
         
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.MessageProperties;
         
        /**
         * 消息的生產者
         * @author zjkj
         *
         */
        public class Rabbitmq_Producer {

        private static final String EXCHANGE_NAME = “exchange_test_3”;
        private static final String ROUTING_KEY = “routingkey_demo”;
        private static final String QUEUE_NAME = “queue_test_3”;
        private static final String IP_ADDRESS = “172.20.0.171”;
        private static final int PORT = 5672; //RabbitMQ服務默認端口號為5672

        public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername(“root”);
        factory.setPassword(“root123”);
        Connection connection = factory.newConnection(); //創建連接
        Channel channel = connection.createChannel(); // 創建信道
         //創建一個type=”direct”、持久化、非自動刪除的交換器
        channel.exchangeDeclare(EXCHANGE_NAME, “direct”,true, false, null);
        // 創建一個持久化、非排他的、非自動刪除的隊列
        channel.queueDeclare(QUEUE_NAME, true, false, false,null);
        // 將交換器與隊列通過路由鍵綁定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 發送一條持久化的消息:hello world!
        for(int i=1;i<=100000;i++){
        String msg = “交換器_1與隊列1綁定:Message_”+i;
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
        }
        // 關閉資源
        channel.close();
        connection.close();
         
        }
         
        }

        消費者代碼

        package com.zjkj.rabbitmq.demo;
         
        import java.io.IOException;
        import java.util.concurrent.TimeUnit;
        import java.util.concurrent.TimeoutException;
         
        import com.rabbitmq.client.AMQP;
        import com.rabbitmq.client.Address;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;
        import com.rabbitmq.client.Consumer;
        import com.rabbitmq.client.DefaultConsumer;
        import com.rabbitmq.client.Envelope;
         
        /**
         * 消息的消費者
         * @author zjkj
         *
         */
        public class Rabbitmq_Consumer {
         
        private static final String QUEUE_NAME = “queue_test_3”;
        private static final String IP_ADDRESS = “192.168.6.171”;
        private static final  int PORT = 5672;

        public static void main(String[] args) throws IOException,TimeoutException,InterruptedException{
        Address[] addresses = new Address[]{
        new Address(IP_ADDRESS,PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(“root”);
        factory.setPassword(“root123”);
        // 這里的連接方式與生產者的demo略有不同,注意區別
        Connection connection = factory.newConnection(addresses); //創建連接
        final Channel channel = connection.createChannel(); // 創建信道
        channel.basicQos(64);// 設置客戶端最多接收未被ack的消息的個數

        /**
         * 這里采用了繼承DefaultConsumer的方式來實現消費,有過RabbitMQ使用經驗的開發者
         * 可能喜歡使用QueueingConsumer的方式來實現消費
         * 因為使用QueueingConsumer會有一些隱患。
         * 同時在RabbitMQ Java客戶端4.0.0版本開始將QueueingConsumer標記為@Deprecated了
         */
        Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)
          throws IOException{
        System.out.println(“recv message : ” + new String(body));
        try{
        TimeUnit.SECONDS.sleep(1);

        }catch(InterruptedException e){
        e.printStackTrace();
        }
        channel.basicAck(envelope.getDeliveryTag(),false);
        }
        };
        channel.basicConsume(QUEUE_NAME,true, consumer);
        //等待回調函數執行完畢之后,關閉資源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
         
        }
         
        }

        查看集群中諸如用戶數、交換器數量、隊列數量等

        [root@node171 rabbitmq]# rabbitmqctl list_users

        Listing users …

        admin  [monitoring]

        guest  [administrator]

        root    []

        [root@node171 rabbitmq]# rabbitmqctl list_exchanges

        Listing exchanges for vhost / …

        amq.topic      topic

        amq.headers    headers

        exchange_test_3 direct

        amq.direct      direct

        exchange_test_2 direct

        amq.rabbitmq.trace      topic

        amq.match      headers

                direct

        exchange_test_1 direct

        amq.fanout      fanout

        [root@node171 rabbitmq]# rabbitmqctl list_queues

        Timeout: 60.0 seconds …

        Listing queues for vhost / …

        queue_test_3    100000

        queue_test_2    200

        queue_test_1    10000

        遷移方案

        遷移步驟

        1. 停止所有生產者和消費者的應用程序

        2. 將集群B中的機器依次一臺一臺加入集群A中,并確認所有隊列鏡像完成

        3. 剔除集群A中的一臺一臺機器

        4. 將應用指向集群B

        方案1【不可行】

        將集群B中的一臺機器加入集群A中,然后再集群B中的另一他機器已加入集群,然后剔除集群A中的一臺機器,然后再剔除集群A中的另一臺機器

        此方案對于RabbitMQ的普通集群也即是Cluster模式是無效的

        1. 停止A集群中的所有連接

        2. 將集群B中的一臺節點加入到A集群中

        將集群A中的.erlang.cookie的值拷貝到集群B中的node173上

        [root@node171 rabbitmq]# cat .erlang.cookie

        ORMTFBMHOXOGFKRLQSPU[root@node171 rabbitmq]#

        [root@node173 plugins]# cp /var/lib/rabbitmq/.erlang.cookie  /var/lib/rabbitmq/erlang.cookie.bak

        [root@node173 plugins]# chmod 700 /var/lib/rabbitmq/.erlang.cookie

         [root@node173 plugins]# vi /var/lib/rabbitmq/.erlang.cookie

        ORMTFBMHOXOGFKRLQSPU
         

        [root@node173 plugins]# chmod 400 /var/lib/rabbitmq/.erlang.cookie

        [root@node173 plugins]# ls -lrth /var/lib/rabbitmq/.erlang.cookie

        -r——– 1 rabbitmq rabbitmq 21 Oct 24 18:51 /var/lib/rabbitmq/.erlang.cookie

        3.將集群B中的node173加入到集群A中

        [root@node173 rabbitmq]# service rabbitmq-server start

        Redirecting to /bin/systemctl start  rabbitmq-server.service

        [root@node173 rabbitmq]# rabbitmqctl stop_app

        Stopping rabbit application on node mq_173@node173 …

        [root@node173 rabbitmq]# rabbitmqctl reset

        Resetting node mq_173@node173 …

        [root@node173 rabbitmq]# rabbitmqctl join_cluster mq171@node171

        Clustering node mq_173@node173 with mq171@node171

        [root@node173 rabbitmq]# rabbitmqctl start_app

        Starting node mq_173@node173 …

         completed with 3 plugins.

        4. 同樣的方法將集群B中的node174加入到集群A中

        [root@node174 rabbitmq]# rabbitmqctl cluster_status

        Cluster status of node mq_174@node174 …

        [{nodes,[{disc,[mq_174@node174]}]},

         {running_nodes,[mq_174@node174]},

         {cluster_name,<<“mq_174@node174”>>},

         {partitions,[]},

         {alarms,[{mq_174@node174,[]}]}]

         [root@node174 rabbitmq]# rabbitmqctl stop_app

        Stopping rabbit application on node mq_174@node174 …

        [root@node174 rabbitmq]# rabbitmqctl reset

        Resetting node mq_174@node174 …

        [root@node174 rabbitmq]# rabbitmqctl join_cluster mq171@node171

        Clustering node mq_174@node174 with mq171@node171

        [root@node174 rabbitmq]# rabbitmqctl start_app

        Starting node mq_174@node174 …

         completed with 0 plugins.

          5.將集群A中的node171剔除集群

        [root@node171 rabbitmq]# rabbitmqctl stop

        Stopping and halting node mq171@node171 …

        這時訪問node172 的Web集群管理

        RabbitMQ集群跨網段消息遷移

        同樣在node173上的Web管理界面查看

        RabbitMQ集群跨網段消息遷移

        至此對于普通的集群模式,這種方案是不行的。

        方案2【可行】

        若RabbitMQ采用鏡像隊列,將集群A中的消息數據遷移到集群B中,

        集群A中的node171、node172采用鏡像隊列

        構建集群A的鏡像隊列環境

        1.首先集群A中的node172加入集群中

        【在node172上操作】

        [root@node172 ~]# rabbitmqctl stop_app

        Stopping rabbit application on node mq172@node172 …

        [root@node172 ~]# rabbitmqctl reset

        Resetting node mq172@node172 …

         [root@node172 ~]# rabbitmqctl join_cluster mq171@node171

        Clustering node mq172@node172 with mq171@node171

        ra[root@node172 ~]# rabbitmqctl start_app

        Starting node mq172@node172 …

        2.設置鏡像策略

        【在node171上操作】

        [root@node171 ~]# rabbitmqctl set_policy ha-all -p kcvhost “^” ‘{“ha-mode”:”all”,”ha-sync-mode”:”automatic”}’

        Setting policy “ha-all” for pattern “^” to “{“ha-mode”:”all”,”ha-sync-mode”:”automatic”}” with priority “0” for vhost “kcvhost” …

         

        [root@node171 ~]# rabbitmqctl set_policy rabbit_mirror “^” ‘{“ha-mode”:”all”}’

        Setting policy “rabbit_mirror” for pattern “^” to “{“ha-mode”:”all”}” with priority “0” for vhost “/” …

        開始集群A中的鏡像隊列遷移

        1.停止所有消息的生產者和消費者相關應用服務

        2.停止集群A中的所有機器,并備份原始數據

         【node171、node172】都要操作

        Node172執行如下:

        [root@node172 ~]# service rabbitmq-server stop

        Redirecting to /bin/systemctl stop  rabbitmq-server.service

        [root@node172 ~]# cd /var/lib/rabbitmq/

        [root@node172 rabbitmq]# ls

        mnesia

        [root@node172 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

        [root@node172 rabbitmq]# service rabbitmq-server start

        Redirecting to /bin/systemctl start  rabbitmq-server.service

        node171執行如下:

        [root@node171 ~]# service rabbitmq-server stop

        Redirecting to /bin/systemctl stop  rabbitmq-server.service

        [root@node171 ~]# cd /var/lib/rabbitmq/

        [root@node171 rabbitmq]# cp -rf mnesia mnesia.20181025.bak

        [root@node171 rabbitmq]# service rabbitmq-server start

        Redirecting to /bin/systemctl start  rabbitmq-server.service

        2.首先將集群B的node173機器加入到集群A中

        [root@node173 network-scripts]# service rabbitmq-server stop

        Redirecting to /bin/systemctl stop rabbitmq-server.service

        [root@mq04 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

        [root@mq04 rabbitmq]# cd /var/lib/rabbitmq

        [root@mq04 rabbitmq]# rm -rf .erlang.cookie  mnesia/

        [root@mq01 rabbitmq]# scp .erlang.cookie  root@mq04:/var/lib/rabbitmq

        The authenticity of host ‘mq04 (192.168.0.232)’ can’t be established.

        ECDSA key fingerprint is SHA256:zgAicKOpvRLLCyhdUbpNvyanKYrPt/Pp9g+Sdq9mAoo.

        ECDSA key fingerprint is MD5:15:7d:1e:c2:86:d5:4a:40:63:df:f5:4e:65:c4:24:62.

        Are you sure you want to continue connecting (yes/no)? yes

        Warning: Permanently added ‘mq04’ (ECDSA) to the list of known hosts.

        root@mq04’s password:

        Permission denied, please try again.

        root@mq04’s password:

        .erlang.cookie                                                                  100%  20    19.6KB/s  00:00

        [root@mq04 rabbitmq]# chmod 400 .erlang.cookie

        [root@mq04 rabbitmq]# chown -R rabbitmq:rabbitmq .erlang.cookie

        [root@mq04 rabbitmq]# service rabbitmq-server start

        Redirecting to /bin/systemctl start rabbitmq-server.service

        [root@mq04 rabbitmq]# rabbitmqctl stop_app

        Stopping rabbit application on node mq04@mq04 …

        [root@mq04 rabbitmq]# rabbitmqctl reset

        Resetting node mq04@mq04 …

        對于阿里云ECS一定要在安全組先臨時開啟15672、25672、5672、4369端口

        [root@node173 network-scripts]# rabbitmqctl join_cluster mq171@node171

        Clustering node mq_173@node173 with mq171@node171

        [root@node173 network-scripts]# rabbitmqctl start_app

        Starting node mq_173@node173 …

         completed with 3 plugins.

        3. 然后再將B集群中的node174機器加入到集群A中

         使用上面同樣的方法,將node174加入到集群中去

        4.剔除集群A中的node171、node172機器

        Node172上執行

        [root@mq02 rabbitmq]# rabbitmqctl stop_app

        Stopping rabbit application on node mq02@mq02 …

        [root@mq02 rabbitmq]# service rabbitmq-server stop

        Redirecting to /bin/systemctl stop rabbitmq-server.service

        [root@mq02 rabbitmq]# cp -rf /var/lib/rabbitmq /var/lib/rabbitmq.bak

        [root@mq02 rabbitmq]# service rabbitmq-server start

        Redirecting to /bin/systemctl start rabbitmq-server.service

        [root@mq02 rabbitmq]# rabbitmqctl stop_app

        Stopping rabbit application on node mq02@mq02 …

        [root@mq02 rabbitmq]# rabbitmqctl reset

        Resetting node mq02@mq02 …

        同樣的node171上執行同樣的命令

        對于采用鏡像隊列集群,此方案可行

        贊(0)
        分享到: 更多 (0)
        網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
        主站蜘蛛池模板: 在线观看亚洲精品福利片| 国产网红无码精品视频| 亚洲AV永久纯肉无码精品动漫| 欧美国产亚洲精品高清不卡| 日韩精品乱码AV一区二区| 久久精品成人免费观看97| 四虎成人www国产精品| 国产精品免费大片| 中文成人无字幕乱码精品区| 精品偷自拍另类在线观看丰满白嫩大屁股ass | 国产久爱免费精品视频| 国产精品亚洲精品观看不卡| 无码人妻精品一区二区三18禁| 老子影院午夜精品无码| 国产成人无码精品一区在线观看| 国产91精品在线| 国产精品国产三级国产av品爱网| 日韩人妻精品一区二区三区视频 | 日韩精品无码免费视频| 99精品福利国产在线| 国产成人精品精品欧美| 精品熟女少妇av免费久久| 无码人妻一区二区三区精品视频| 国产精品龙口护士门在线观看| 欧美亚洲国产精品久久蜜芽| 国产成人精品白浆久久69| 日韩av无码久久精品免费| 一区二区国产精品 | 国产精品夜色视频一级区 | 亚洲欧洲自拍拍偷精品 美利坚| 久久精品这里只有精99品| 国产国产成人久久精品| 亚洲国产精品久久久久婷婷老年| 99久久免费国产精品热| 国产精品拍天天在线| 国产精品亚洲一区二区三区在线 | 亚洲国产精品视频| 拍国产乱人伦偷精品视频| 久久久精品久久久久久| 精品欧美一区二区三区久久久| 精品无码三级在线观看视频|