先上代码

Spring学习笔记3——消息队列(rabbitmq), 发送邮件,学习笔记rabbitmq

   
本节的内容是用户注册时,将邮箱地址先存入rabbitmq队列,之后返回给用户注册成功;之后消息队列的接收者从队列中获取消息,发送邮件给用户。

 

一、RabbitMQ介绍

    如果之前对rabbitmq不了解,推荐先看一下RabbitMQ
Quick(快速手册)。

1、rabbitmq在mac上的安装。

2、rabbitmq简单介绍。

图片 1

生产者: 负责发送消息到Exchange。

Exchange: 按照一定的策略,负责将消息存入到指定的队列。

队列queue:  负责保存消息。

消费者: 负责从队列中提取消息。

binding: 负责Exchange和队列的关联映射,Exchange和queue是多对多的关系。

 

二、RabbitMQ在Spring中的实现

1、引入依赖包。

<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
            <version>1.6.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.6.0.RELEASE</version>
        </dependency>

2、rabbitmq配置文件。

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/rabbit"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
              http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!--1、配置连接工厂, 如果不配置host, port, username, passowrd, 则按默认值localhost:5672, guest/guest-->
    <!--<connection-factory id="connectionFactory" />-->
    <connection-factory id="connectionFactory"
                        host="localhost"
                        port="5672"
                        username="everSeeker"
                        password="333" />

    <!--2、配置队列queue, Exchange, 以及将他们结合在一起的binding-->
    <!--在queue以及exchange中, 有一个重要的属性durable, 默认为true, 可以防止宕机后数据丢失。-->
    <!--在listener-container中, 有acknowledge属性, 默认为auto, 即消费者成功处理消息后必须有个应答, 如果消费者程序发生异常或者宕机, 消息会被重新放回队列-->
    <admin connection-factory="connectionFactory" />
    <queue id="userAlertEmailQueue" name="user.alerts.email" durable="true" />
    <queue id="userAlertCellphoneQueue" name="user.alerts.cellphone" />     <!--durable默认为true-->

    <!--标准的AMQP Exchange有4种: Direct, Topic, Headers, Fanout, 根据实际需要选择。-->
    <!--Direct: 如果消息的routing key与bingding的routing key直接匹配的话, 消息将会路由到该队列上。-->
    <!--Topic: 如果消息的routing key与bingding的routing key符合通配符匹配的话, 消息将会路由到该队列上。-->
    <!--Headers: 如果消息参数表中的头信息和值都与binding参数表中相匹配, 消息将会路由到该队列上。-->
    <!--Fanout: 不管消息的routing key和参数表的头信息/值是什么, 消息将会路由到该队列上。-->
    <direct-exchange name="user.alert.email.exchange" durable="true">
        <bindings>
            <binding queue="user.alerts.email" />     <!--默认的routing key与队列的名称相同-->
        </bindings>
    </direct-exchange>
    <direct-exchange name="user.alert.cellphone.exchange">
        <bindings>
            <binding queue="user.alerts.cellphone" />
        </bindings>
    </direct-exchange>

    <!--3、配置RabbitTemplate发送消息-->
    <template id="rabbitTemplate"
              connection-factory="connectionFactory" />

    <!--4、配置监听器容器和监听器来接收消息-->
    <beans:bean id="userListener" class="com.everSeeker.alerts.UserAlertHandler" />
    <listener-container connection-factory="connectionFactory" acknowledge="auto">
        <listener ref="userListener"
                  method="handleUserAlertToEmail"
                  queues="userAlertEmailQueue" />
        <listener ref="userListener"
                  method="handleUserAlertToCellphone"
                  queues="userAlertCellphoneQueue" />
    </listener-container>

</beans:beans>

如果配置connection-factory时,采用默认的guest/guest账号密码时,有可能会出现org.springframework.amqp.AmqpAuthenticationException:
com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED –
Login was refused using authentication mechanism PLAIN. For details see
the broker
logfile.的错误提示,解决办法是新建一个管理员权限的用户,并允许访问虚拟主机。步骤如下:

1、打开http://localhost:15672/
2、Admin ——> Users, 新建用户,administrator权限。
3、Virtual Hosts,设置新建用户允许访问。

3、生产者发送消息到exchange。

@Service("userAlertService")
public class UserAlertServiceImpl implements UserAlertService {
    private RabbitTemplate rabbit;

    @Autowired
    public UserAlertServiceImpl(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }

    public void sendUserAlertToEmail(User user) {
        //convertAndSend(String exchange, String routingKey, Object object), 将对象object封装成Message对象后, 发送给exchange
        rabbit.convertAndSend("user.alert.email.exchange", "user.alerts.email", user);
    }
}

4、配置消费者来接收消息。

public class UserAlertHandler {

    public void handleUserAlertToEmail(User user) {
        System.out.println(user);
}

 

三、通过javax.mail来发送邮件

1、引入依赖包。

<dependency>
    <groupId>javax.mail</groupId>
    <artifactId>mail</artifactId>
    <version>1.4.7</version>
</dependency>

2、配置邮件服务器信息。

@Bean
public MailSender mailSender(Environment env) {
    JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
    //如果为普通邮箱, 非ssl认证等, 比如163邮箱
    mailSender.setHost(env.getProperty("mailserver.host"));
    mailSender.setPort(Integer.parseInt(env.getProperty("mailserver.port")));
    mailSender.setUsername(env.getProperty("mailserver.username"));
    mailSender.setPassword(env.getProperty("mailserver.password"));
    mailSender.setDefaultEncoding("utf-8");

    //如果邮件服务器采用了ssl认证, 增加以下配置, 比如gmail邮箱, qq邮箱
    Properties props = new Properties();
    props.put("mail.smtp.auth", "true");
    props.put("mail.smtp.starttls.enable", "true");
    props.put("mail.smtp.socketFactory.class","javax.net.ssl.SSLSocketFactory");
    props.put("mail.smtp.socketFactory.port", "465");
    mailSender.setJavaMailProperties(props);

    return mailSender;
}

3、发送邮件。

@Component("userMailService")
public class UserMailServiceImpl implements UserMailService {
    private MailSender mailSender;

    @Autowired
    public UserMailServiceImpl(MailSender mailSender) {
        this.mailSender = mailSender;
    }

    public void sendSimpleUserMail(String to, User user) {
        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom("[email protected]");
        message.setTo(to);
        message.setSubject(user.getUsername() + "信息确认");
        message.setText(user.toString());
        mailSender.send(message);
    }
}

4、消费者调用发送邮件方法即可。

 

1、参考文献:Spring实战(第4版)。

2、完整代码在github,地址:

),
发送邮件,学习笔记rabbitmq
本节的内容是用户注册时,将邮箱地址先存入rabbitmq队列,之后返回给用户…

rabbitmq消息发送模型

图片 2图片 3

图片 4

namespace RabbitMQDemo
{
    public partial class HelloWorld : Form
    {
        string queueName1 = "hello_queue1";//消费者1
        string queueName2 = "hello_queue2";//消费者2
        Action<string> SetText;
        /// <summary>
        /// 单线程实例
        /// </summary>
        private static readonly HelloWorld _helloWorld;
        static HelloWorld()
        {
            _helloWorld = new HelloWorld();
        }
        /// <summary>
        /// 单例模式
        /// </summary>
        public static HelloWorld SingleForm
        { get { return _helloWorld; } }
        private HelloWorld()
        {
            CheckForIllegalCrossThreadCalls = false;
            InitializeComponent();
            ReseiveMsg(queueName1);
            ReseiveMsg(queueName2);
            SetText += OnSetText;
        }

        private void btnSendMsg_Click(object sender, EventArgs e)
        {
            SendMsg();
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        private void SendMsg()
        {
            string message = txtPublisher.Text;
            if (message.Trim().Length <= 0)
            {
                MessageBox.Show("请输入要发送的消息");
            }
            string queueName = cbBoxQueues.SelectedValue.ToString();
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "",
                                     routingKey: queueName,
                                     basicProperties: null,
                                     body: body);
            }
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private void ReseiveMsg(string queueName)
        {
            //string queueName = cbBoxQueues.SelectedText;
            try
            {
                var factory = new ConnectionFactory() { HostName = "localhost" };

                //connection和channel不能使用using,否则会被dispose掉
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
                //声明队列 生产者和消费者都需要QueueDeclare
                channel.QueueDeclare(queue: queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);

                    txtConsumer1.Invoke(SetText, message);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: true,
                                     consumer: consumer);
            }
            catch (Exception ex)
            {
                MessageBox.Show(ex.ToString());
            }
        }

        private void OnSetText(string txtContent)
        {
            string queueName = cbBoxQueues.SelectedValue.ToString();
            if (queueName == queueName1)
                txtConsumer1.Text += string.Format("{0}\r\n", txtPublisher.Text);
            if (queueName == queueName2)
                txtConsumer2.Text += string.Format("{0}\r\n", txtPublisher.Text);
        }

        private void HelloWorld_Load(object sender, EventArgs e)
        {
            List<DataSource> lst = new List<DataSource>();
            lst.Add(new DataSource("消费者1", "hello_queue1"));
            lst.Add(new DataSource("消费者2", "hello_queue2"));

            cbBoxQueues.DataSource = lst;
            cbBoxQueues.DisplayMember = "DisplayMember";
            cbBoxQueues.ValueMember = "DisplayValue";
        }

        private class DataSource
        {
            public DataSource(string displayMember,string displayValue)
            {
                DisplayMember = displayMember;
                DisplayValue = displayValue;
            }
            public string DisplayMember { get; set; }
            public string DisplayValue { get; set; }
        }
    }
}

要素:

View Code

  • 生产者
  • 消费者
  • 交换器:生产者将消息发送到交换器
  • 队列:交换器通过某种路由规则绑定到指定队列,将消息加入队列,消费者从队列消费消息

界面如下:

前提:

图片 5

引入rabbitmq的java客户端jar包

大致流程是

        <!-- import rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.6</version>
        </dependency>

生产者发送消息到队列,然后队列(rabbitmq)把消息发送给消费者(消费者向rabbitmq索取消息)

 

 两个消费者:

一、消息生产者

图片 6

1、代码:

图片 7

 1 package com.xxx.producer;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 
10 /**
11  * 消息生产者
12  */
13 public class HelloWorldProducer {
14     private static final String QUEUE_NAME    = "helloQueue";
15     private static final String EXCHANGE_NAME = "helloExchange";
16 
17     public static void main(String[] args) throws IOException, TimeoutException {
18         ConnectionFactory factory = new ConnectionFactory();// 建立连接工厂
19         factory.setHost("192.168.20.238");// 设置rabbitmq服务器地址
20         factory.setPort(5672);// 设置rabbitmq服务器端口
21         factory.setUsername("zhaojigang");
22         factory.setPassword("wangna");
23         factory.setVirtualHost("zhaojigangvhost");
24 
25         Connection connection = factory.newConnection();// 建立连接
26         Channel channel = connection.createChannel();// 建立信道
27 
28         /**
29          * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
30          * durable:队列是否持久化
31          * exclusive:当最后一个消费者取消订阅时,是否自动删除
32          * autoDelete:只有当前应用程序才能够消费队列消息(场景:限制一个队列只有一个消费者)
33          * arguments:other properties (construction arguments) for the queue
34          */
35         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建队列(如果队列不存在,创建;如果存在,什么都不做)
36         /**
37          * exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
38          * exchange:交换器名字
39          * type:3种类型 direct/fanout/topic
40          */
41         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
42 
43         for (int i = 0; i < 10; i++) {
44             String msg = "helloworld_" + i;// 创建消息
45             /**
46              * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
47              * exchange:交换器
48              * routingKey:路由键
49              * props:other properties for the message - routing headers etc
50              * body:消息体
51              */
52             channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, msg.getBytes());// 发布消息
53             System.out.println("发送消息:msg-->" + msg);
54         }
55 
56         channel.close();// 关闭信道
57         connection.close();// 关闭连接
58     }
59 }

图片 8

2、步骤:

 

  • 创建并设置连接工厂
    • host、port、username、password、vhost
    • 值得注意的是,一定要现在rabbitmq
      server上把username和password设置好,并且开启该用户在指定vhost上的权限,才可以设置连接工厂成功
  • 创建连接
  • 创建信道
  • 创建队列
  • 创建交换器
  • 创建(创建之后也可以配置消息)并发送消息
  • 关闭信道
  • 关闭连接

3、注意点:

  • queueDeclare方法:如果队列不存在,创建;如果存在,什么都不做

  • basicPublish:发布消息到指定的交换器,并制定路由规则(用于消费者部分的绑定操作)

 

二、消息消费者

1、代码:

 1 package com.xxx.consumer;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import com.rabbitmq.client.Channel;
 7 import com.rabbitmq.client.Connection;
 8 import com.rabbitmq.client.ConnectionFactory;
 9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 import com.rabbitmq.client.AMQP.BasicProperties;
13 
14 /**
15  * 消息消费者
16  */
17 public class HelloWorldConsumer {
18     private final static String QUEUE_NAME = "helloQueue";
19     private static final String EXCHANGE_NAME = "helloExchange";
20 
21     public static void main(String[] args) throws IOException, TimeoutException {
22         ConnectionFactory factory = new ConnectionFactory();// 建立连接工厂
23         factory.setHost("192.168.20.238");// 设置rabbitmq服务器地址
24         factory.setPort(5672);// 设置rabbitmq服务器端口
25         factory.setUsername("zhaojigang");
26         factory.setPassword("wangna");
27         factory.setVirtualHost("zhaojigangvhost");
28 
29         Connection connection = factory.newConnection();// 建立连接
30         Channel channel = connection.createChannel();// 建立信道
31 
32         /**
33          * Note that we declare the queue here, as well. 
34          * Because we might start the receiver before the sender, 
35          * we want to make sure the queue exists before we try to consume messages from it.
36          */
37         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建队列(如果队列不存在,创建;如果存在,什么都不做)
38         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
39         /**
40          * queueBind(String queue, String exchange, String routingKey)
41          */
42         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME);
43         Consumer consumer = new DefaultConsumer(channel){
44             @Override
45             public void handleDelivery(String consumerTag, 
46                                        Envelope envelope, 
47                                        BasicProperties properties,
48                                        byte[] body) throws IOException {
49                 String msg = new String(body,"UTF-8");
50                 System.out.println("接收消息:msg-->" + msg);
51             }
52         };54             /**
55              * basicConsume(String queue, boolean autoAck, Consumer callback)
56              * autoAck true if the server should consider messages acknowledged once delivered; 
57              * false if the server should expect explicit acknowledgements
          * 这里启动一个consume,该consume会不断的接收消息,如果此处用while(true)包起来的话,就会不断的启动consume
58              */
59           channel.basicConsume(QUEUE_NAME, true, consumer);61         
62 //        channel.close();// 关闭信道
63 //        connection.close();// 关闭连接
64     }
65 }

2、步骤:

  • 创建并设置连接工厂
    • host、port、username、password、vhost
  • 创建连接
  • 创建信道
  • 创建队列
  • 创建交换器
  • 通过路由规则绑定队列和交换器
  • 创建消息处理函数
  • 从队列获取消息并消费消息(根据消息处理函数)

 

三、测试

1、启动rabbitmq服务器

2、启动消费者进程

3、启动生产者进程

4、查看console即可或者查看rabbitmq的webUI

相关文章