惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

H
Help Net Security
The GitHub Blog
The GitHub Blog
F
Fortinet All Blogs
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
Simon Willison's Weblog
Simon Willison's Weblog
D
Darknet – Hacking Tools, Hacker News & Cyber Security
Cisco Talos Blog
Cisco Talos Blog
P
Privacy & Cybersecurity Law Blog
I
Intezer
Y
Y Combinator Blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
N
Netflix TechBlog - Medium
The Hacker News
The Hacker News
AWS News Blog
AWS News Blog
aimingoo的专栏
aimingoo的专栏
A
About on SuperTechFans
Exploit-DB.com RSS Feed
Exploit-DB.com RSS Feed
Stack Overflow Blog
Stack Overflow Blog
Hacker News: Ask HN
Hacker News: Ask HN
酷 壳 – CoolShell
酷 壳 – CoolShell
量子位
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
B
Blog
T
Tor Project blog
C
Cybersecurity and Infrastructure Security Agency CISA
云风的 BLOG
云风的 BLOG
博客园_首页
V2EX - 技术
V2EX - 技术
T
Threat Research - Cisco Blogs
腾讯CDC
宝玉的分享
宝玉的分享
博客园 - 叶小钗
罗磊的独立博客
S
Securelist
The Last Watchdog
The Last Watchdog
Google Online Security Blog
Google Online Security Blog
Scott Helme
Scott Helme
博客园 - 司徒正美
W
WeLiveSecurity
有赞技术团队
有赞技术团队
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
S
Secure Thoughts
NISL@THU
NISL@THU
N
News and Events Feed by Topic
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
雷峰网
雷峰网
大猫的无限游戏
大猫的无限游戏
K
Kaspersky official blog
IT之家
IT之家

极客兔兔

Go sync.Cond | Go 语言高性能编程 Go 死码消除与调试(debug)模式 | Go 语言高性能编程 Go sync.Once | Go 语言高性能编程 Go 逃逸分析 | Go 语言高性能编程 2020 年终总结 | 极客兔兔 Go struct 内存对齐 | Go 语言高性能编程 Go 空结构体 struct{} 的使用 | Go 语言高性能编程 控制协程(goroutine)的并发数量 | Go 语言高性能编程 | 极客兔兔 如何退出协程 goroutine (其他场景) | Go 语言高性能编程 如何退出协程 goroutine (超时场景) | Go 语言高性能编程 Go 语言陷阱 - 数组和切片 | Go 语言高性能编程 减小 Go 代码编译后的二进制体积 | Go 语言高性能编程 Go Reflect 提高反射性能 | Go 语言高性能编程 读写锁和互斥锁的性能比较 | Go 语言高性能编程 | 极客兔兔 for 和 range 的性能比较 | Go 语言高性能编程 切片(slice)性能及陷阱 | Go 语言高性能编程 | 极客兔兔 字符串拼接性能及原理 | Go 语言高性能编程 | 极客兔兔 pprof 性能分析 | Go 语言高性能编程 benchmark 基准测试 | Go 语言高性能编程 Go 语言高性能编程 | 极客兔兔 Go 接口型函数的使用场景 | 极客兔兔 Python 简明教程 | 快速入门 | 极客兔兔 Go 语言笔试面试题(代码输出) | 极客面试 | 极客兔兔 动手写RPC框架 - GeeRPC第七天 服务发现与注册中心(registry) | 极客兔兔 动手写RPC框架 - GeeRPC第六天 负载均衡(load balance) 动手写RPC框架 - GeeRPC第五天 支持HTTP协议 | 极客兔兔 动手写RPC框架 - GeeRPC第四天 超时处理(timeout) | 极客兔兔 动手写RPC框架 - GeeRPC第三天 服务注册(service register) 动手写RPC框架 - GeeRPC第二天 支持并发与异步的客户端 | 极客兔兔 7天用Go从零实现RPC框架GeeRPC | 极客兔兔 Go 语言笔试面试题(并发编程) | 极客面试 | 极客兔兔 Go 语言笔试面试题(基础语法) | 极客面试 | 极客兔兔 Go 语言笔试面试题汇总 | 极客面试 | 极客兔兔 Go Context 并发编程简明教程 | 快速入门 Go Mmap 文件内存映射简明教程 | 快速入门 动手写ORM框架 - GeeORM第七天 数据库迁移(Migrate) | 极客兔兔 动手写ORM框架 - GeeORM第六天 支持事务(Transaction) | 极客兔兔 动手写ORM框架 - GeeORM第五天 实现钩子(Hooks) | 极客兔兔 动手写ORM框架 - GeeORM第四天 链式操作与更新删除 | 极客兔兔 动手写ORM框架 - GeeORM第三天 记录新增和查询 | 极客兔兔 动手写ORM框架 - GeeORM第二天 对象表结构映射 | 极客兔兔 动手写ORM框架 - GeeORM第一天 database/sql 基础 SQLite 常用命令 | 速查表(Cheat Sheet) 7天用Go从零实现ORM框架GeeORM | 极客兔兔 动手写分布式缓存 - GeeCache第七天 使用 Protobuf 通信 动手写分布式缓存 - GeeCache第六天 防止缓存击穿 | 极客兔兔 动手写分布式缓存 - GeeCache第五天 分布式节点 | 极客兔兔 动手写分布式缓存 - GeeCache第四天 一致性哈希(hash) | 极客兔兔 Go Mock (gomock)简明教程 | 快速入门 动手写分布式缓存 - GeeCache第三天 HTTP 服务端 动手写分布式缓存 - GeeCache第二天 单机并发缓存 | 极客兔兔 Go Test 单元测试简明教程 | 快速入门 7天用Go从零实现分布式缓存GeeCache | 极客兔兔 Go WebAssembly (Wasm) 简明教程 | 快速入门 Go RPC & TLS 鉴权简明教程 | 快速入门 Go Protobuf 简明教程 | 快速入门 Go语言动手写Web框架 - Gee第七天 错误恢复(Panic Recover) WSL, Git, Mircosoft Terminal 等常用工具配置 Rust 简明教程 | 快速入门 | 极客兔兔 Go语言动手写Web框架 - Gee第六天 模板(HTML Template) 百宝箱 - 值得收藏的工具网站 | 极客兔兔 Go语言动手写Web框架 - Gee第五天 中间件Middleware | 极客兔兔 Go语言动手写Web框架 - Gee第四天 分组控制Group | 极客兔兔 Go语言动手写Web框架 - Gee第三天 前缀树路由Router | 极客兔兔 博客折腾记(七) - Gitalk Plus | 极客兔兔 Go语言动手写Web框架 - Gee第二天 上下文Context | 极客兔兔 Go2 新特性简明教程 | 快速入门 | 极客兔兔 博客折腾记(六) - 不要为了流量忘记了初心 | 极客兔兔 Go语言动手写Web框架 - Gee第一天 http.Handler | 极客兔兔 7天用Go从零实现Web框架Gee教程 | 极客兔兔 Go Gin 简明教程 | 快速入门 Go 语言简明教程 | 快速入门 | 极客兔兔 机器学习笔试面试题 11-20 | 极客面试 | 极客兔兔 机器学习笔试面试题 1-10 | 极客面试 | 极客兔兔 机器学习笔试面试题汇总 | 极客面试 | 极客兔兔 TensorFlow 2 中文文档 - RNN LSTM 文本分类 TensorFlow 2 中文文档 - TFHub 迁移学习 TensorFlow 2 中文文档 - 卷积神经网络分类 CIFAR-10 TensorFlow 2 中文文档 - 保存与加载模型 TensorFlow 2 中文文档 - 过拟合与欠拟合 TensorFlow 2 中文文档 - 回归预测燃油效率 TensorFlow 2 中文文档 - 特征工程结构化数据分类 TensorFlow 2 中文文档 - IMDB 文本分类 TensorFlow 2 中文文档 - MNIST 图像分类 TensorFlow 2 / 2.0 中文文档 TensorFlow 2.0 (九) - 强化学习 70行代码实战 Policy Gradient 博客折腾记(五) - 友链这件事,没那么简单 | 极客兔兔 博客折腾记(四) - 原创资格是争取来的 | 极客兔兔 TensorFlow 2.0 (八) - 强化学习 DQN 玩转 gym Mountain Car TensorFlow 2.0 (七) - 强化学习 Q-Learning 玩转 OpenAI gym 博客折腾记(三) - 主题设计、彩蛋与阅读量翻倍 | 极客兔兔 TensorFlow 2.0 (六) - 监督学习玩转 OpenAI gym game 博客折腾记(二) - 对搜索引擎的理解 | 极客兔兔 博客折腾记(一) - 极致性能的尝试 | 极客兔兔 Pandas 数据处理(三) - Cheat Sheet 中文版 TensorFlow 2.0 (五) - mnist手写数字识别(CNN卷积神经网络) TensorFlow入门(四) - mnist手写数字识别(制作h5py训练集) | 极客兔兔 TensorFlow入门(三) - mnist手写数字识别(可视化训练) | 极客兔兔 Pandas 数据处理(二) - 筛选数据 | 极客兔兔 Pandas 数据处理(一) - DataFrame 与 Series
动手写RPC框架 - GeeRPC第一天 服务端与消息编码 | 极客兔兔
2020-10-06 · via 极客兔兔

源代码/数据集已上传到 Github - 7days-golang

golang RPC framework

本文是7天用Go从零实现RPC框架GeeRPC的第一篇。

  • 使用 encoding/gob 实现消息的编解码(序列化与反序列化)
  • 实现一个简易的服务端,仅接受消息,不处理,代码约 200 行

消息的序列化与反序列化

一个典型的 RPC 调用如下:

1
err = client.Call("Arith.Multiply", args, &reply)

客户端发送的请求包括服务名 Arith,方法名 Multiply,参数 args 三个,服务端的响应包括错误 error,返回值 reply 2 个。我们将请求和响应中的参数和返回值抽象为 body,剩余的信息放在 header 中,那么就可以抽象出数据结构 Header:

day1-codec/codec/codec.go

1
2
3
4
5
6
7
8
9
package codec

import "io"

type Header struct {
ServiceMethod string
Seq uint64
Error string
}
  • ServiceMethod 是服务名和方法名,通常与 Go 语言中的结构体和方法相映射。
  • Seq 是请求的序号,也可以认为是某个请求的 ID,用来区分不同的请求。
  • Error 是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于 Error 中。

我们将和消息编解码相关的代码都放到 codec 子目录中,在此之前,还需要在根目录下使用 go mod init geerpc 初始化项目,方便后续子 package 之间的引用。

进一步,抽象出对消息体进行编解码的接口 Codec,抽象出接口是为了实现不同的 Codec 实例:

1
2
3
4
5
6
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

紧接着,抽象出 Codec 的构造函数,客户端和服务端可以通过 Codec 的 Type 得到构造函数,从而创建 Codec 实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type NewCodecFunc func(io.ReadWriteCloser) Codec

type Type string

const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)

var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec
}

我们定义了 2 种 Codec,GobJson,但是实际代码中只实现了 Gob 一种,事实上,2 者的实现非常接近,甚至只需要把 gob 换成 json 即可。

首先定义 GobCodec 结构体,这个结构体由四部分构成,conn 是由构建函数传入,通常是通过 TCP 或者 Unix 建立 socket 时得到的链接实例,dec 和 enc 对应 gob 的 Decoder 和 Encoder,buf 是为了防止阻塞而创建的带缓冲的 Writer,一般这么做能提升性能。

day1-codec/codec/gob.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package codec

import (
"bufio"
"encoding/gob"
"io"
"log"
)

type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer
dec *gob.Decoder
enc *gob.Encoder
}

var _ Codec = (*GobCodec)(nil)

func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}

接着实现 ReadHeaderReadBodyWriteClose 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (c *GobCodec) ReadHeader(h *Header) error {
return c.dec.Decode(h)
}

func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}

func (c *GobCodec) Write(h *Header, body interface{}) (err error) {
defer func() {
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()
if err := c.enc.Encode(h); err != nil {
log.Println("rpc codec: gob error encoding header:", err)
return err
}
if err := c.enc.Encode(body); err != nil {
log.Println("rpc codec: gob error encoding body:", err)
return err
}
return nil
}

func (c *GobCodec) Close() error {
return c.conn.Close()
}

通信过程

客户端与服务端的通信需要协商一些内容,例如 HTTP 报文,分为 header 和 body 2 部分,body 的格式和长度通过 header 中的 Content-TypeContent-Length 指定,服务端通过解析 header 就能够知道如何从 body 中读取需要的信息。对于 RPC 协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1个字节用来表示序列化方式,第2个字节表示压缩方式,第3-6字节表示 header 的长度,7-10 字节表示 body 的长度。

对于 GeeRPC 来说,目前需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体 Option 中承载。目前,已经进入到服务端的实现阶段了。

day1-codec/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
package geerpc

const MagicNumber = 0x3bef5c

type Option struct {
MagicNumber int
CodecType codec.Type
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
}

一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC 客户端固定采用 JSON 编码 Option,后续的 header 和 body 的编码方式由 Option 中的 CodeType 指定,服务端首先使用 JSON 解码 Option,然后通过 Option 的 CodeType 解码剩余的内容。即报文将以这样的形式发送:

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|

在一次连接中,Option 固定在报文的最开始,Header 和 Body 可以有多个,即报文可能是这样的。

1
| Option | Header1 | Body1 | Header2 | Body2 | ...

服务端的实现

通信过程已经定义清楚了,那么服务端的实现就比较直接了。

day1-codec/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

type Server struct{}


func NewServer() *Server {
return &Server{}
}


var DefaultServer = NewServer()



func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Println("rpc server: accept error:", err)
return
}
go server.ServeConn(conn)
}
}



func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
  • 首先定义了结构体 Server,没有任何的成员字段。
  • 实现了 Accept 方式,net.Listener 作为参数,for 循环等待 socket 连接建立,并开启子协程处理,处理过程交给了 ServerConn 方法。
  • DefaultServer 是一个默认的 Server 实例,主要为了用户使用方便。

如果想启动服务,过程是非常简单的,传入 listener 即可,tcp 协议和 unix 协议都支持。

1
2
lis, _ := net.Listen("tcp", ":9999")
geerpc.Accept(lis)

ServeConn 的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder 反序列化得到 Option 实例,检查 MagicNumber 和 CodeType 的值是否正确。然后根据 CodeType 得到对应的消息编解码器,接下来的处理交给 serverCodec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43


func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() { _ = conn.Close() }()
var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber {
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType]
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.serveCodec(f(conn))
}


var invalidRequest = struct{}{}

func (server *Server) serveCodec(cc codec.Codec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
req, err := server.readRequest(cc)
if err != nil {
if req == nil {
break
}
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
continue
}
wg.Add(1)
go server.handleRequest(cc, req, sending, wg)
}
wg.Wait()
_ = cc.Close()
}

serveCodec 的过程非常简单。主要包含三个阶段

  • 读取请求 readRequest
  • 处理请求 handleRequest
  • 回复请求 sendResponse

之前提到过,在一次连接中,允许接收多个请求,即多个 request header 和 request body,因此这里使用了 for 无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:

  • handleRequest 使用了协程并发执行请求。
  • 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(sending)保证。
  • 尽力而为,只有在 header 解析失败时,才终止循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

type request struct {
h *codec.Header
argv, replyv reflect.Value
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println("rpc server: read header error:", err)
}
return nil, err
}
return &h, nil
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := &request{h: h}


req.argv = reflect.New(reflect.TypeOf(""))
if err = cc.ReadBody(req.argv.Interface()); err != nil {
log.Println("rpc server: read argv err:", err)
}
return req, nil
}

func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {
sending.Lock()
defer sending.Unlock()
if err := cc.Write(h, body); err != nil {
log.Println("rpc server: write response error:", err)
}
}

func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {


defer wg.Done()
log.Println(req.h, req.argv.Elem())
req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}

目前还不能判断 body 的类型,因此在 readRequest 和 handleRequest 中,day1 将 body 作为字符串处理。接收到请求,打印 header,并回复 geerpc resp ${req.h.Seq}。这一部分后续再实现。

main 函数(一个简易的客户端)

day1 的内容就到此为止了,在这里我们已经实现了一个消息的编解码器 GobCodec,并且客户端与服务端实现了简单的协议交换(protocol exchange),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。

接下来,我们就在 main 函数中看看如何使用刚实现的 GeeRPC 吧。

day1-codec/main/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"encoding/json"
"fmt"
"geerpc"
"geerpc/codec"
"log"
"net"
"time"
)

func startServer(addr chan string) {

l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}

func main() {
addr := make(chan string)
go startServer(addr)


conn, _ := net.Dial("tcp", <-addr)
defer func() { _ = conn.Close() }()

time.Sleep(time.Second)

_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
cc := codec.NewGobCodec(conn)

for i := 0; i < 5; i++ {
h := &codec.Header{
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}
_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))
_ = cc.ReadHeader(h)
var reply string
_ = cc.ReadBody(&reply)
log.Println("reply:", reply)
}
}
  • startServer 中使用了信道 addr,确保服务端端口监听成功,客户端再发起请求。
  • 客户端首先发送 Option 进行协议交换,接下来发送消息头 h := &codec.Header{},和消息体 geerpc req ${h.Seq}
  • 最后解析服务端的响应 reply,并打印出来。

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
start rpc server on [::]:63662
&{Foo.Sum 0 } geerpc req 0
reply: geerpc resp 0
&{Foo.Sum 1 } geerpc req 1
reply: geerpc resp 1
&{Foo.Sum 2 } geerpc req 2
reply: geerpc resp 2
&{Foo.Sum 3 } geerpc req 3
reply: geerpc resp 3
&{Foo.Sum 4 } geerpc req 4
reply: geerpc resp 4

附 推荐阅读


last updated at 2026-02-23