惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Simon Willison's Weblog
Simon Willison's Weblog
P
Privacy International News Feed
www.infosecurity-magazine.com
www.infosecurity-magazine.com
T
Troy Hunt's Blog
Hacker News - Newest:
Hacker News - Newest: "LLM"
Attack and Defense Labs
Attack and Defense Labs
S
Secure Thoughts
V2EX - 技术
V2EX - 技术
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
O
OpenAI News
Cloudbric
Cloudbric
Google Online Security Blog
Google Online Security Blog
Schneier on Security
Schneier on Security
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
Help Net Security
Help Net Security
Cyberwarzone
Cyberwarzone
G
GRAHAM CLULEY
L
Lohrmann on Cybersecurity
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
Spread Privacy
Spread Privacy
NISL@THU
NISL@THU
N
News and Events Feed by Topic
T
Tenable Blog
S
Security @ Cisco Blogs
N
News and Events Feed by Topic
The Hacker News
The Hacker News
C
CXSECURITY Database RSS Feed - CXSecurity.com
宝玉的分享
宝玉的分享
月光博客
月光博客
酷 壳 – CoolShell
酷 壳 – CoolShell
美团技术团队
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google DeepMind News
Google DeepMind News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
T
Tailwind CSS Blog
V
Visual Studio Blog
P
Proofpoint News Feed
Webroot Blog
Webroot Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
博客园 - 三生石上(FineUI控件)
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
Jina AI
Jina AI
雷峰网
雷峰网
T
The Blog of Author Tim Ferriss
Hugging Face - Blog
Hugging Face - Blog
腾讯CDC
L
LangChain Blog
The Register - Security
The Register - Security
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
博客园 - 聂微东

博客园 - 小水坝

使用双分派解决领域实体和外部机制通信问题 读《软件需求最佳实践》有感 【thrift】vc中使用thrift中文字符串乱码问题解决 __declspec(dllimport)的小秘密(转) 跨线程send message 【boost】使用serialization库序列化子类 【boost】ptree 读写中文的问题 动态创建TeeChart的简便方法 【MFC】动态创建CMFCToolbar图标不显示问题 【boost】使用装饰者模式改造boost::thread_group 【VC】VC工具栏图标合并工具(非tbcreator和visual toolbar) 【boost】使用lambda表达式和generate_n生成顺序序列 【boost】BOOST_LOCAL_FUNCTION体验 【boost】MFC dll中使用boost thread的问题 【转帖】C++编译原理 资料 IE6,7下password框长度变短问题 dwz局部表格分页 dwz中combox的value问题 dwz中使用flot,js报表等js插件
搞定thrift双向消息
小水坝 · 2014-11-26 · via 博客园 - 小水坝

thrift作为脱胎于facebook的rpc框架,各方面都非常优秀。清晰的分层设计,多语言的支持,以及不输protocolbuffer的效率(compact下优于protocolbuffer),都让thrift拥有越来越多的使用者。

作为一个RPC框架,thrift支持的是open->client--rpc-->server->close的短连接模式。在实际应用中,却经常会有客户端建立连接后,等待服务端数据的长连接模式,也可以称为双向连接。通常的方案有三种,可参考http://dongxicheng.org/search-engine/thrift-bidirectional-async-rpc/,文中提到第三种方法会修改源码,而实际操作过程中发现这其实是作者小小的理解错误,实现thrift双向通信并没有这么复杂,经过一番实验,发现只需要如下理解和实现即可轻松实现一个thrift的双向连接。

  1. 双向连接的service必须为oneway,否则会因为recv函数抛出remote close异常
  2. 客户端重用建立client的protocol,开线程使用processor.Process(protocol,protocol)监听服务端callback的消息。
  3. 服务端使用ProcessorFactory,使用TConnectionInfo中的transport作为向客户端发送消息的client的transport

搞定以上三步,即可实现一个thrift双向连接,这里附上实验代码,客户端使用C#(sorry for my pool C#),服务端使用C++

thrift

service HandshakeService{
    oneway void HandShake();
}

service CallbackService{
    oneway void Push(1: string msg); 
}

client

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Thrift.Collections;
using Thrift.Protocol;
using Thrift.Server;
using Thrift.Transport;
using System.Threading;
using Thrift;
using System.IO;

namespace ThriftBidirection
{
    class Program
    {
        class CallbackServiceImply : CallbackService.Iface
        {
            int msgCount = 0;
            public void Push(string msg)
            {
                Console.WriteLine("receive msg {0}: {1}", msgCount++, msg);
            }
        }
        //服务处理线程
        static void ProcessThread(TProtocol protocol)
        {
            TProcessor processor = new CallbackService.Processor(new CallbackServiceImply());
            while (true)
            {
                try
                {
                    //////////////////////////////////////////////////////////////////////////
                    ///模仿server行为,同时重用client端protocol
                    ///相当于同时重用一个连接
                    while (processor.Process(protocol, protocol)) { };
                    ///connection lost, return
                    return;
                }
                catch (IOException) //not fatal error, resume
                {
                    continue;
                }
                catch (TException) //fatal error
                {
                    return;
                }
            }
        }
        //服务器状态监听线程
        static void MonitorThread(TTransport trans, Action<string> callback)
        {
            while (true)
            {
                try
                {
                    if (!trans.Peek())
                    {
                        callback("连接中断");
                    }
                    Thread.Sleep(3000);
                }
                catch (Thrift.TException ex)
                {
                    callback(ex.Message);
                    return;
                }
            }
        }

        static void Main(string[] args)
        {
            TTransport transport = new TBufferedTransport(new TSocket("localhost", 5555));
            TProtocol protocol = new TBinaryProtocol(transport);
            HandshakeService.Client client = new HandshakeService.Client(protocol);
            Action<TProtocol> processAction = new Action<TProtocol>(ProcessThread);
            Action<TTransport, Action<string>> monitorAction = new Action<TTransport, Action<string>>(MonitorThread);

            transport.Open();
            processAction.BeginInvoke(protocol, (result) =>
            {
                 processAction.EndInvoke(result);
            }, null);
            monitorAction.BeginInvoke(transport, (msg) =>
            {
                Console.WriteLine("连接中断: {0}", msg);
            }, (result) =>
            {

            }, null);

            for (int i = 0; i < 100; ++i)
            {
                client.HandShake();
                Thread.Sleep(10);
            }
            Console.Read();
            transport.Close();
        }
    }
}

server

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "HandshakeService.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <boost/make_shared.hpp>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/concurrency/PlatformThreadFactory.h>
#include "CallbackService.h"

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace apache::thrift::concurrency;

using boost::make_shared;
using boost::shared_ptr;

class HandshakeServiceHandler : virtual public HandshakeServiceIf {
 public:
  HandshakeServiceHandler(const boost::shared_ptr<TTransport> &trans) 
      : m_client(make_shared<TBinaryProtocol>(trans))
  {
      boost::once_flag flag = BOOST_ONCE_INIT;
      m_flag = flag;
  }

  virtual ~HandshakeServiceHandler()
  {
        m_thread->interrupt();
        m_thread->join();
  }

  void CallbackThread()
  {
      while(true)
      {
          try
          {
              m_client.Push("server push msg");
          }
          catch (TException)
          {
              return;
          }
          boost::this_thread::sleep_for(boost::chrono::milliseconds(20));
      }
  }

  void HandShake() {
    // Your implementation goes here
    printf("HandShake\n");
    boost::call_once(boost::bind(&HandshakeServiceHandler::_StartThread, this), m_flag);
  }

  void _StartThread()
  {
    m_thread.reset(new boost::thread(boost::bind(&HandshakeServiceHandler::CallbackThread, this)));
  }

boost::shared_ptr<TTransport> m_trans;
CallbackServiceClient m_client;
shared_ptr<boost::thread> m_thread;
boost::once_flag m_flag;
};

class ProcessorFactoryImply : public TProcessorFactory
{
    virtual boost::shared_ptr<TProcessor> getProcessor(
        const TConnectionInfo& connInfo)
    {
        return make_shared<HandshakeServiceProcessor>(make_shared<HandshakeServiceHandler>(connInfo.transport));
    }
};


int main(int argc, char **argv) {
  int port = 5555;
  shared_ptr<TProcessorFactory> processorFactory(new ProcessorFactoryImply());
  shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
  shared_ptr<ThreadManager> threadMgr = ThreadManager::newSimpleThreadManager(30);
  boost::shared_ptr<PlatformThreadFactory> threadFactory =
      boost::shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory());

  threadMgr->threadFactory(threadFactory);
  threadMgr->start();
  TThreadPoolServer server(processorFactory,serverTransport, transportFactory, protocolFactory, threadMgr);
  server.serve();
  return 0;
}

一个简单的thrift双向通信就实现了。