惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

N
News | PayPal Newsroom
Security Archives - TechRepublic
Security Archives - TechRepublic
Hacker News: Ask HN
Hacker News: Ask HN
H
Hacker News: Front Page
Apple Machine Learning Research
Apple Machine Learning Research
TaoSecurity Blog
TaoSecurity Blog
Help Net Security
Help Net Security
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
V
V2EX
Hugging Face - Blog
Hugging Face - Blog
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
人人都是产品经理
人人都是产品经理
博客园 - 三生石上(FineUI控件)
Security Latest
Security Latest
Cloudbric
Cloudbric
WordPress大学
WordPress大学
S
SegmentFault 最新的问题
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
www.infosecurity-magazine.com
www.infosecurity-magazine.com
Know Your Adversary
Know Your Adversary
A
Arctic Wolf
L
LangChain Blog
Application and Cybersecurity Blog
Application and Cybersecurity Blog
The GitHub Blog
The GitHub Blog
P
Proofpoint News Feed
W
WeLiveSecurity
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
M
MIT News - Artificial intelligence
Google DeepMind News
Google DeepMind News
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
The Cloudflare Blog
小众软件
小众软件
NISL@THU
NISL@THU
云风的 BLOG
云风的 BLOG
P
Privacy & Cybersecurity Law Blog
S
Security @ Cisco Blogs
博客园 - 【当耐特】
I
InfoQ
Vercel News
Vercel News
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
P
Proofpoint News Feed
O
OpenAI News
Google DeepMind News
Google DeepMind News
N
News and Events Feed by Topic
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
K
Kaspersky official blog
T
Threat Research - Cisco Blogs
量子位
宝玉的分享
宝玉的分享

博客园 - qy1141

Mysql插入Emoji表情出错 spring+mybatis事务不起作用的原因 安卓开发随记 springmvc + spring + ibatis + mysql Eclipse中配置svn J2EE环境配置与工具使用 SqlServer数据库空间使用情况常用命令 数据库备份与还原 关于数据库优化杂技 windows2008吃尽内存的解决办法 asp.net中http提交数据所遇到的那些坑 重新开博 WCF Security系列(2)--服务器端的安全 WCF Security系列(1)--Security概述 如果为网站生成自签名SSL证书 关于证书 转:最真实的2006年应届毕业生真实薪水 转:如何修复Team Foundation Server Workgroup Edition 不小心删除了所有Team Foundation Licensed Users组内用户问题 转 :TFS(Team Foundation Server)使用经验
在C#中使用消息队列RabbitMQ
qy1141 · 2014-10-27 · via 博客园 - qy1141

2014-10-27 14:41  qy1141  阅读(12681)  评论()    收藏  举报

1、什么是RabbitMQ。详见 http://www.rabbitmq.com/

    作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力。

2、安装

    RabbitMQ服务:http://www.rabbitmq.com/download.html
    (安装完RabbitMQ服务后,会在Windows服务中看到。如果没有Erlang运行环境,在安装过程中会提醒先安装Erlang环境。http://www.erlang.org/downloads)

    .net客户端类库:http://www.rabbitmq.com/dotnet.html

3、插件

     RabbitMQ提供了很多好用的插件,最常用的就是web管理工具,启动此插件。

     CMD中运行命令:rabbitmq-plugins enable rabbitmq_management

     注:rabbitmq-plugins 所在路径为:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

     web管理工具的地址是:http://localhost:15672,初始用户名:guest 初始密码:guest

4、配置

    配置文件地址为:C:\Documents and Settings\Administrator\Application Data\RabbitMQ\rabbitmq.config,默认没有rabbit.config文件,需要手工新建(默认会有rabbitmq.config.example 作为参考)。基于安全,做了两个配置,如下:

[
{rabbit,
[
{loopback_users, [<<"guest">>]},
{tcp_listeners, [{"127.0.0.1", 1234},
{"10.121.1.48", 8009}]}

]}
].

loopback_users:设置只能在与RabbitMq服务同一台机器上访问服务的用户。

tcp_listeners:设置RabbitMQ监听的IP地址与端口。只监听局域网内网iP、修改默认端口,防止被入侵攻击。

设置完后,别忘记了以下操作,否则配置不起作用。

  • 停止RabbitMQ服务;
  • 重新安装服务使配置生效:rabbitmq-service.bat install

        此命令要切换到路径:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.4.0\sbin

  • 启动RabbitMQ服务;

5、Demo练习。

消息生产者:

class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);
                        while (true)
                        {
                            string customStr = Console.ReadLine();
                            RequestMsg requestMsg = new RequestMsg();
                            requestMsg.Name = string.Format("Name_{0}", customStr);
                            requestMsg.Code = string.Format("Code_{0}", customStr);
                            string jsonStr = JsonConvert.SerializeObject(requestMsg);
                            byte[] bytes = Encoding.UTF8.GetBytes(jsonStr);
                            
                            //设置消息持久化
                            IBasicProperties properties = channel.CreateBasicProperties();
                            properties.DeliveryMode = 2;
                            channel.BasicPublish("", "MyFirstQueue", properties, bytes);

                            //channel.BasicPublish("", "MyFirstQueue", null, bytes);

                            Console.WriteLine("消息已发送:" + requestMsg.ToString());
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }
class Program
    {
        static void Main(string[] args)
        {
            try
            {
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = Constants.MqHost;
                factory.Port = Constants.MqPort;
                factory.UserName = Constants.MqUserName;
                factory.Password = Constants.MqPwd;
                using (IConnection conn = factory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        //在MQ上定义一个持久化队列,如果名称相同不会重复创建
                        channel.QueueDeclare("MyFirstQueue", true, false, false, null);

                        //输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息
                        channel.BasicQos(0, 1, false);
                        
                        Console.WriteLine("Listening...");

                        //在队列上定义一个消费者
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        //消费队列,并设置应答模式为程序主动应答
                        channel.BasicConsume("MyFirstQueue", false, consumer);

                        while (true)
                        {
                            //阻塞函数,获取队列中的消息
                            BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] bytes = ea.Body;
                            string str = Encoding.UTF8.GetString(bytes);
                            RequestMsg msg = JsonConvert.DeserializeObject<RequestMsg>(str);
                            Console.WriteLine("HandleMsg:" + msg.ToString());
                            //回复确认
                            channel.BasicAck(ea.DeliveryTag, false);
                        }
                    }
                }
            }
            catch (Exception e1)
            {
                Console.WriteLine(e1.ToString());
            }
            Console.ReadLine();
        }
    }