惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

GbyAI
GbyAI
博客园_首页
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
阮一峰的网络日志
阮一峰的网络日志
酷 壳 – CoolShell
酷 壳 – CoolShell
博客园 - 司徒正美
V
V2EX
Cloudbric
Cloudbric
Hugging Face - Blog
Hugging Face - Blog
腾讯CDC
量子位
博客园 - 三生石上(FineUI控件)
博客园 - 叶小钗
K
Kaspersky official blog
博客园 - 【当耐特】
T
Tenable Blog
L
Lohrmann on Cybersecurity
The Cloudflare Blog
S
Schneier on Security
A
Arctic Wolf
Latest news
Latest news
C
Cyber Attacks, Cyber Crime and Cyber Security
罗磊的独立博客
T
The Exploit Database - CXSecurity.com
Cisco Talos Blog
Cisco Talos Blog
小众软件
小众软件
P
Privacy & Cybersecurity Law Blog
WordPress大学
WordPress大学
Simon Willison's Weblog
Simon Willison's Weblog
雷峰网
雷峰网
NISL@THU
NISL@THU
人人都是产品经理
人人都是产品经理
月光博客
月光博客
J
Java Code Geeks
V
Visual Studio Blog
S
Security Affairs
博客园 - Franky
T
Tailwind CSS Blog
Apple Machine Learning Research
Apple Machine Learning Research
H
Heimdal Security Blog
有赞技术团队
有赞技术团队
V2EX - 技术
V2EX - 技术
AWS News Blog
AWS News Blog
G
GRAHAM CLULEY
T
Troy Hunt's Blog
SecWiki News
SecWiki News
Spread Privacy
Spread Privacy
宝玉的分享
宝玉的分享
www.infosecurity-magazine.com
www.infosecurity-magazine.com
博客园 - 聂微东

Blog Feed

Golang pprof 案例实战 - 字节星球 字节星球终于全栈上新! - 字节星球 欢迎试用 TodoList - 字节星球 记一次“说走就走”的成都行 - 字节星球 Docker Desktop修改默认存储路径 - 字节星球 流程中心使用指南 - 字节星球 微分方程小手册 - 字节星球 近期小记 - 字节星球 字节星球(肥柴之家)搬家了! - 字节星球 如何顺利注册ChatGPT? - 字节星球 披着CLion的外衣实则在讲CMake - 字节星球 守望之墓/电子骨灰盒 - 字节星球 Python笔记 第三章 - 字节星球 WePlanet现已发布! - 字节星球 简单选择排序和堆排序 - 字节星球 希尔排序 - 字节星球 插入排序 - 字节星球 快速排序 - 字节星球 Python 笔记 第二章 - 字节星球 Python笔记 第一章 - 字节星球 Web使用HarmonyOS字体的压缩方案 - 字节星球 字节星球关于在评论区等位置展示IP属地的公告 - 字节星球 MATLAB简明教程#1 - 字节星球 解决Qt5无法连接MySQL数据库的问题 - 字节星球 时隔多年,终于摆脱了控制台 - 字节星球 论内卷 - 字节星球 堆排序 - 字节星球 连通块中点的数量 - 字节星球 合并集合(并查集) - 字节星球 Trie字符串统计 - 字节星球 KMP字符串 - 字节星球
Golang 手写一个 Channel - 字节星球
2026-05-29 · via Blog Feed

2026/5/29 21:16:53Henry40 阅读3 点赞7 评论

对于 Golang 来说,其实永远绕不过这两个东西,一个是 GMP 模型,这个已经倒背如流了,相信每一个 Gopher 都已经深挖过 GMP,这在我的内部 Wiki 中也有非常详细的源码解读,此外就是 Channel。至于内存管理,如 GC 等,其实和语言关联性不强,算是比较通用的解决方案,在 Golang 中我觉得没必要深究。

GMP 和 Channel 算是组成了 Golang 的核心特性:高并发能力 + CSP

Do not communicate by sharing memory; instead, share memory by communicating.

sync.Cond

一个条件变量,用于让 goroutine 在某个条件不满足时睡眠,等条件满足后再被唤醒。

它通常和 sync.Mutex 配合使用:cond := *sync.NewCond(&mutex)

  • Wait 会原子地释放锁并挂起当前 goroutine,等被 SignalBroadcast 唤醒后,会在返回前重新加锁;
  • Broadcast 会唤醒所有等待者,Signal 只唤醒一个等待者。

GO

mu.Lock()
for conditions {
	cond.Wait()
}
// ...
mu.Unlock()

注意,一般要用 for,不要用 if

因为被唤醒后,只代表条件可能变了,不代表条件一定满足。所以醒来以后必须重新检查条件。Go 官方文档也建议把 Wait 放在循环里检查条件。

方法 作用
Wait() 当前 goroutine 进入等待,并临时释放锁
Signal() 唤醒一个正在等待的 goroutine
Broadcast() 唤醒所有正在等待的 goroutine

例如:

内部大概做了如下几件事:

  1. 释放锁
  2. 当前 goroutine 睡眠
  3. 被 Signal 或 Broadcast 唤醒
  4. 重新抢锁
  5. Wait 返回

所以调用 Wait() 前,必须已经持有锁:

GO

mu.Lock()
cond.Wait()
mu.Unlock()

实现 Channel

Model

GO

// unbufferedSlot 无缓冲 channel
type unbufferedSlot[T any] struct {
	// 泛型
	value T
	// 用于记录是否被消费,发送者需要等待无缓冲 channel 被完全消费
	consumed bool
}

// MyChan 自实现 channel
type MyChan[T any] struct {
	// mu 用于并发控制
	mu sync.Mutex

	// sync.Cond 用于阻塞 goroutine
	cond sync.Cond

	// capacity 表示缓冲区大小
	capacity int

	// buf 用于有缓冲的 channel
	buf []T

	// closed 记录 channel 是否已关闭
	closed bool

	// slot 用于无缓冲的 channel
	slot *unbufferedSlot[T]
}

Common

GO

// NewMyChan 新建一个 channel
func NewMyChan[T any](capacity int) *MyChan[T] {
	if capacity <= 0 {
		capacity = 0
	}

	ch := &MyChan[T]{
		capacity: capacity,
	}

	// NewCond 参数为 Locker Interface
	// 而 Mutex 在指针类型上实现了 Locker,所以这里需要 &ch.mu
	ch.cond = *sync.NewCond(&ch.mu)

	return ch
}

// Close 关闭 Channel
func(ch *MyChan[T]) Close() {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	// 和原版 Channel 保持一致
	if ch.closed {
		panic("close of closed channel")
	}

	ch.closed = true

	// 无缓冲 Channel 如果还有值等待接收
	// 则唤醒发送者并抛出 panic
	if ch.capacity == 0 && ch.slot != nil && !ch.slot.consumed {
		ch.slot = nil
	}

	// 唤醒阻塞中的发送者/接收者
	// 发送者醒来后发现 closed=true,然后 panic
	// 接收者醒来后如果没有数据,就返回零值和 false
	ch.cond.Broadcast()
}

这里需要注意的是 sync.NewCond 接受已实现 Locker 接口类型的参数。

查看源码可知 sync.Mutex 中,Lock/Unlock 方法使用的是指针接收器,所以 *sync.Mutex 才算实现了 Locker 接口。

方法 T 是否实现接口 *T 是否实现接口
func (t T) M()
func (t *T) M()

此外,Golang 不建议在同个类型的方法实现中同时使用值接收器和指针接收器。

一个类型只要有任何方法需要用指针接收器,通常这个类型的其他方法也都建议用指针接收器。

这样可以避免接口方法集不一致,也能避免复制带来的语义和并发问题。

Send

GO

// Send 向 Channel 发送数据
func (ch *MyChan[T]) Send(val T) {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	// 首先判断 Channel 是否有缓冲
	if ch.capacity == 0 {
		ch.sendUnbuffered(val)
		return
	}

	// 有缓冲
	// 首先要判断缓冲区是否还有空间,没有空间则阻塞等待
	for len(ch.buf) >= ch.capacity && !ch.closed {
		ch.cond.Wait()
	}

	// buffer 有空间了,但是 channel 被关闭了
	// 和原版保持行为一致 panic
	if ch.closed {
		panic("send on closed channel")
	}

	ch.buf = append(ch.buf, val)

	// 现在 buffer 缓冲区有数据了,应该唤醒可能阻塞的接收方 goroutines
	ch.cond.Broadcast()
}

// sendUnbuffered 发送到无缓冲 Channel
func (ch *MyChan[T]) sendUnbuffered(val T) {
	// 由于是无缓冲的,所有如果有数据待接收
	// 将会阻塞发送方
	for ch.slot != nil && !ch.closed {
		ch.cond.Wait()
	}

	// 和原版 channel 一样,如果向一个 closed channel 发送
	// 直接抛出 panic
	if ch.closed {
		panic("send on closed channel")
	}

	s := &unbufferedSlot[T]{
		value: val,
		consumed: false,
	}
	ch.slot = s

	// 现在 channel 有数据了,唤醒等待的接收方 goroutine
	ch.cond.Broadcast()

	// 等待值被接收,否则阻塞
	for !s.consumed && !ch.closed {
		ch.cond.Wait()
	}

	// 判断是否因关闭而跳出阻塞
	if !s.consumed && ch.closed {
		panic("send on closed channel")
	}
}

Receive

GO

func(ch *MyChan[T]) Recv() (T, bool) {
	ch.mu.Lock()
	defer ch.mu.Unlock()

	if ch.capacity == 0 {
		return ch.recvUnbuffered()
	}

	// 有缓冲
	// buffer 为空且 chan 没有被关闭,阻塞等待
	for len(ch.buf) == 0 != !ch.closed {
		ch.cond.Wait()
	}

	// 特殊处理 closed channel
	var zeroVal T
	if len(ch.buf) == 0 && ch.closed {
		return zeroVal, false
	}

	// 有数据,按照 FIFO 出队
	val := ch.buf[0]

	// 截断掉第一个已经出队的元素
	ch.buf[0] = zeroVal
	ch.buf = ch.buf[1:]

	// 出队后,buffer 可能有空余空间
	// 唤醒阻塞的发送方
	ch.cond.Broadcast()

	return val, true
}

// recvUnbuffered 从无缓冲的 Channel 接收值
func(ch *MyChan[T]) recvUnbuffered() (T, bool) {
	// 等待接收数据
	for ch.slot == nil && !ch.closed {
		ch.cond.Wait()
	}

	// 如果没有数据,且已被关闭,那么返回零值和 ok: false
	var zeroVal T
	if ch.slot == nil && ch.closed {
		return zeroVal, false
	}

	// slot 不为空
	s := ch.slot
	val := s.value

	s.value = zeroVal
	s.consumed = true

	// 接收完之后注意清理掉 slot
	ch.slot = nil

	// 值已经接收完成,唤醒可能被阻塞的发送方 goroutines
	ch.cond.Broadcast()

	// 注意,如果 channel closed 但 buffer 还有数据
	// ok 依然为 true,直到 buffer 数据接收完毕,返回零值和 false
	return val, true
}

YuelaiGroup | 字节星球 2026-05-29