惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

P
Privacy & Cybersecurity Law Blog
Vercel News
Vercel News
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
N
Netflix TechBlog - Medium
罗磊的独立博客
F
Fortinet All Blogs
T
Threatpost
Y
Y Combinator Blog
博客园_首页
美团技术团队
Security Latest
Security Latest
博客园 - 三生石上(FineUI控件)
T
Tailwind CSS Blog
V
V2EX - 技术
The Cloudflare Blog
L
LINUX DO - 热门话题
博客园 - 司徒正美
Jina AI
Jina AI
P
Proofpoint News Feed
宝玉的分享
宝玉的分享
C
CXSECURITY Database RSS Feed - CXSecurity.com
C
Cybersecurity and Infrastructure Security Agency CISA
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
WordPress大学
WordPress大学
The Hacker News
The Hacker News
P
Privacy International News Feed
T
The Exploit Database - CXSecurity.com
Scott Helme
Scott Helme
有赞技术团队
有赞技术团队
V
V2EX
Stack Overflow Blog
Stack Overflow Blog
M
MIT News - Artificial intelligence
Latest news
Latest news
NISL@THU
NISL@THU
Google DeepMind News
Google DeepMind News
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Cisco Blogs
雷峰网
雷峰网
Application and Cybersecurity Blog
Application and Cybersecurity Blog
B
Blog RSS Feed
W
WeLiveSecurity
D
DataBreaches.Net
G
Google Developers Blog
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
G
GRAHAM CLULEY
Spread Privacy
Spread Privacy
Know Your Adversary
Know Your Adversary
TaoSecurity Blog
TaoSecurity Blog
S
Securelist
Help Net Security
Help Net Security

博客园 - 水木

为什么要监控sql语句,以及如何监控,都有哪几种方式可以监控。 关于hexo与github使用过程中的问题与笔记 - 水木 - 博客园 IIS错误代码500.21 ,Nhibernate更新报错,委托的使用。action传参数 mysql 使用的三个小技巧 利用反向代理服务器,加快国内对国外主机的访问 - 水木 2019年七月第三周总结 ManualResetEven使用的最清楚说明 如何画数据流图 如何画好ER图 UML图中时序图的基本用法 系统架构设计上需要注意的 weblogic介绍 - 水木 - 博客园 Tuxedo 介绍 winform如何不卡界面 银行基金代销系统调研 如何在wcf中用net tcp协议进行通讯 20190710用控制台启动一个wcf服务 wcf必知必会以及与Webapi的区别 - 水木 2019年7月第一周总结-RabbitMQ总结
RabbitMQ入门学习系列(七) 远程调用RPC
水木 · 2019-07-08 · via 博客园 - 水木

快速阅读

生产者和消费者启动以后,都有一个接收事件,消费者是接收事件是处理调用方法以后等待生产者的返回,生产者的接收事件是处理接收生产者发送的消息,进行处理。消费者发送的时候要在回调队列中加入一个标识,标明是哪个方法进行的调用 。生产者接收到消费以后,如果发现有消息标识 ,把消息标识继续返回去,这样消费者可以保证接收到的消息是哪个方法调用的

关于RPC调用的建议

  1. 明确哪个函数是调用本地的,哪个函数是调用远程的
  2. 组合之间的依赖要清晰明了
  3. 应该能处理当远程服务挂掉的时的错误

消费者代码

主方法通过实例化rpcclient,然后调用rpcclient里面的方法,获得结果以后关闭

rpcclient的逻辑如下

  1. 声明连接和信道
  2. 创建一个回调的队列 。
  3. 定义一个消费者的事件,绑定信道。
  4. 为信道创建一个消息头,在里面标识消息id,和回调队列的名称
  5. 消费者接收事件处理,当收到消息以后,判断消息头,如果是发送的消息id,则加入到返回的消息集合中。
  6. 从消息集合中取值
static void Main(string[] args)
{
    //TopicMessageTest();
    RpcClient rpc = new RpcClient();
    Console.WriteLine("开始启动");
    var response = rpc.Call("30");
    Console.WriteLine("the result is :"+response);
    rpc.Close();
    Console.WriteLine("调用结束");
    Console.ReadLine();
}
 public class RpcClient
 {

     private readonly IConnection connection;
     private readonly IModel channel;
     private readonly string replyQueueName;
     private readonly EventingBasicConsumer consumer;
     private readonly IBasicProperties props;

     private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); //线程安全的集合
     public RpcClient()
     {
         var factory = new ConnectionFactory() { HostName="localhost"}; //创建一个实例

         connection = factory.CreateConnection(); //创建连接

         channel = connection.CreateModel(); //创建信道

         replyQueueName = channel.QueueDeclare().QueueName; //创建队列

         consumer = new EventingBasicConsumer(channel);//通过指定的model初台化消费者

         props = channel.CreateBasicProperties();

         var relationId = Guid.NewGuid().ToString();
         props.CorrelationId = relationId;//应用相关标识 
         props.ReplyTo = replyQueueName;  //回复队列指定 

         consumer.Received += (sender,e)=>
         {
             var body = e.Body;
             var response = Encoding.UTF8.GetString(body);
             if (e.BasicProperties.CorrelationId == relationId)
             {
                 respQueue.Add(response);
             }
         };



     }

     public string Call(string message)
     {
         var messageBytes = Encoding.UTF8.GetBytes(message);
         channel.BasicPublish(exchange: "", routingKey: "rpcqueue", basicProperties: props, body: messageBytes);
         channel.BasicConsume(consumer: consumer, queue: replyQueueName, autoAck: true);
         return respQueue.Take();
     }
        

生产者代码

  1. 创建链接和信道
  2. 声明一个队列,指定队列名称。
  3. 配置Qos,每次取几条消息
  4. 创建消费者在接收事件中对消费者发送的消息进行处理。
  5. 事件处理中,body表示接收到的消息 ,basicProperties是消息头,对消息进行处理以后,再把消息以及消息的队列发送给消费者
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {

        channel.QueueDeclare(queue: "rpcqueue",durable:false,exclusive:false,autoDelete:false,arguments:null);
        channel.BasicQos(0, 1, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "rpcqueue", autoAck: false,consumer:consumer);
        Console.WriteLine("Waiting rpc requesting");

        consumer.Received += (sender, e) =>
        {
            string response = null;
            var body = e.Body;
            var props = e.BasicProperties;
            var replyProps = channel.CreateBasicProperties();
            replyProps.CorrelationId = props.CorrelationId;

            var message = Encoding.UTF8.GetString(body);
            int n = int.Parse(message);
            Console.WriteLine("request message is :" + message);
            response = fib(n).ToString();

            var responseBytes = Encoding.UTF8.GetBytes(response);
            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
            channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
        };
        Console.WriteLine("over");
        Console.ReadLine();

    }


    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}


private static int fib(int n)
{
    if (n == 0 || n == 1) return n;
    return fib(n - 1) + fib(n - 2);
}

测试结果

友情提示

​ 我对我的文章负责,发现好多网上的文章 没有实践,都发出来的,让人走很多弯路,如果你在我的文章中遇到无法实现,或者无法走通的问题。可以直接在公众号《爱码农爱生活 》留言。必定会再次复查原因。让每一篇 文章的流程都能顺利实现。