`
u014377504
  • 浏览: 2379 次
文章分类
社区版块
存档分类
最新评论

rabbitmq

阅读更多
1.simple
producer --》 queue --》 consumer   生产后直接消费
2.work-queues
proudcer --》 queue --》 consumer   将消息发送到一个队列中,由队列的消费者依次消费
                                --》 consumer
                                --》 consumer
3.广播模式
producer  --》 exchange --》 queue --》 consumer   生产者将消息发送到消息中转器,并有
                                       --》 queue --》 consumer    消息中转器转到一个或多个队列,消费
4.direct模式
producer  --》 exchange --》 queue --》 consumer   中转器中在绑定队列与订阅的主题进行
                                       --》 queue --》 consumer   连接,将消息放到属于的队列中
5.topic
producer  --》 exchange --》 queue --》 consumer   中转器中在绑定队列与订阅的主题进行
                                       --》 queue --》 consumer   连接,将消息放到属于的队列中,正则
demo
package RabbitMQ;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
 * Created by antong on 15/12/29.
 */
public class RabbitMQDemo {
    private static final String QUEUE_NAME = "test";
    private static final String EXCHANGE = "exchange_name";
    private static final String[] SEVERITIES = {"black","blue","red"};

    private static ConnectionFactory factory = new ConnectionFactory();
    private RabbitMQDemo(){}
    static {
        factory.setHost("localhost");
    }
    public static Connection getConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }

    public static void close(Channel channel,Connection connection) throws IOException, TimeoutException {
        channel.close();
        connection.close();
    }
    
    //发送队列  work-queues
    public static void sendQueue() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        for(int i = 0;i < 10;i++) {
            //String exchange(“”), String routingKey(队列名称), BasicProperties props, byte[] body            
            channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
            System.out.println(" [x] Sent '" + message + "'" + i);
        }
        close(channel, connection);
    }
    //接受队列  work-queues
    public static void receiveQueue() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        //String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
    
    
    
    //广播模式
    public static void sendExchange() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        //String exchange, String type(队列模式)(fanout,direct,topic)
        channel.exchangeDeclare(EXCHANGE, "fanout");
        String message = "Hello World!";
        for(int i = 0;i < 10;i++) {
            //String exchange, String routingKey(“”), BasicProperties props, byte[] body
            channel.basicPublish(EXCHANGE, "", null, (message+i).getBytes());
            System.out.println(" [x] Sent '" + message + "'" + i);
        }
        close(channel, connection);
    }
    //广播模式
    public static void receiveExchange() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        //String queue, String exchange, String routingKey(“”)
        channel.queueBind(queueName, EXCHANGE, "");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C"+queueName);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
    
    
    
    //直连模式
    public static void sendDirect() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        //String exchange, String type(队列模式)(fanout,direct,topic)
        channel.exchangeDeclare(EXCHANGE, "direct");
        String message = "Hello World!";
        for(int i = 0;i < 10;i++) {
            //String exchange, String routingKey(主题), BasicProperties props, byte[] body
            channel.basicPublish(EXCHANGE, getSeverity(), null, (message+getSeverity()).getBytes());
            System.out.println(" [x] Sent '" + message + "'" + i);
        }
        close(channel, connection);
    }
    //直连模式
    public static void receiveDirect() throws IOException, TimeoutException {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE, "direct");
        String queueName = channel.queueDeclare().getQueue();
        //String queue, String exchange, String routingKey(主题)
        channel.queueBind(queueName, EXCHANGE, getSeverity());
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C"+queueName);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
    private static String getSeverity()
    {
        Random random = new Random();
        int ranVal = random.nextInt(3);
        return SEVERITIES[ranVal];
    }

}
分享到:
评论

相关推荐

    RabbitMQ快速入门及API介绍(401M)

    【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...

    批量删除rabbitmq的队列或交换机

    采用python编写的批量删除rabbitmq的队列或交换机。 1.修改rabbitmq_delete.py中rabbitmq的配置; 2.执行以下命令: 删除队列: python3 rabbitmq_delete.py -k ‘udata.climb’ -d 1 删除交换机: python3 rabbitmq_...

    rabbitmq-java (2).zip

    rabbitmq

    rabbitmq配置文件 rabbitmq.config

    rabbitmq配置文件,用于rabbitmq管理

    RabbitMQ工具类及测试类(完整版)

    RabbitMQClientUtil是MQ的测试工具类,他封装了fanout、direct、topic三种exchange模式,并包括发送数据和接收数据。...rabbitmq.properties配置文件根据自己需要自行放在,放置完毕后,请自己修改util中的文件地址

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包

    RabbitMQ rabbitmq-server-3.6.12-1.el6.noarch 及其安装所需要的软件打包都在这里面,主要报卡一下软件:socat-1.7.3.2.tar.gz、rabbitmq-server-3.6.12-1.el6.noarch.rpm、rabbitmq-release-signing-key.asc、otp_...

    SpringBoot整合RabbitMQ.zip

    SpringBoot整合RabbitMQ的详细过程 **1.该篇博文首先讲述了交换机和队列之间的绑定关系** ①direct、②fanout、③topic **2.然后讲消息的回调** 四种情况下,确认触发哪个回调函数: ①消息推送到server,但是在...

    【BAT必备】rabbitMq面试题

    【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题【BAT必备】rabbitMq面试题...

    rabbitmq-server-3.7.3

    RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在...

    windows版本rabbitmq安装包

    windows版本rabbitmq安装包 里面内含rabbitmq-server-3.9.13.exe、otp_win64_24 内含rabbitmq_delayed_message_exchange-3.9.0.ez 插件

    ARM版本的 rabbitmq docker镜像资源

    ARM版本的 rabbitmq 镜像资源 版本 3.8.9 使用拷贝到主机 执行:docker load &lt; rabbitmq_arm3.8.9.tar 生成docker镜像

    rabbitmq 3.9.3 配置文件

    rabbitmq 3.9.3 配置文件

    tp6使用rabbitmq

    tp6使用rabbitmq

    rabbitmqadmin

    RabbitMQ命令行手动创建队列rabbitmqadmin用法 手动创建队列方法:登录http://ip:15672/cli下载 将下载的rabbitmqadmin放到/usr/sbin目录下,并赋予权限:chmod 755 rabbitmqadmin 查看命令帮助信息: python ...

    flink-sql集成rabbitmq

    flink-sql集成rabbitmq

    RabbitMQ消息模式之Confirm确认消息

    理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,...import com.rabbitmq.client.Queuein

    rabbitMq window10安装包

    rabbitMq window10安装包

    Linux下安装RabbitMQ

    以rabbitmq3.6版本为例

    RabbitMQ使用指南.pdf

    RabbitMQ下载安装配置使用指南官方手册

    RabbitMQ源码和客户端工具

    RabbitMQ源码和客户端工具RabbitMQ源码和客户端工具

Global site tag (gtag.js) - Google Analytics