惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
PCI Perspectives
PCI Perspectives
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Google Online Security Blog
Google Online Security Blog
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
The GitHub Blog
The GitHub Blog
S
Secure Thoughts
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
WordPress大学
WordPress大学
SecWiki News
SecWiki News
B
Blog
小众软件
小众软件
Hacker News - Newest:
Hacker News - Newest: "LLM"
Webroot Blog
Webroot Blog
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
L
LINUX DO - 热门话题
Recent Commits to openclaw:main
Recent Commits to openclaw:main
酷 壳 – CoolShell
酷 壳 – CoolShell
IT之家
IT之家
The Cloudflare Blog
Google DeepMind News
Google DeepMind News
Know Your Adversary
Know Your Adversary
Y
Y Combinator Blog
F
Fortinet All Blogs
W
WeLiveSecurity
博客园 - Franky
MongoDB | Blog
MongoDB | Blog
Last Week in AI
Last Week in AI
The Last Watchdog
The Last Watchdog
S
Schneier on Security
爱范儿
爱范儿
V
V2EX - 技术
L
LINUX DO - 最新话题
月光博客
月光博客
博客园 - 【当耐特】
Latest news
Latest news
阮一峰的网络日志
阮一峰的网络日志
博客园 - 司徒正美
U
Unit 42
Schneier on Security
Schneier on Security
E
Exploit-DB.com RSS Feed
J
Java Code Geeks
Cyberwarzone
Cyberwarzone
T
The Blog of Author Tim Ferriss
TaoSecurity Blog
TaoSecurity Blog
博客园 - 叶小钗
T
Troy Hunt's Blog
大猫的无限游戏
大猫的无限游戏
AI
AI
Security Latest
Security Latest

又见苍岚

COLMAP PatchMatch Stereo 算法详解 事件驱动的状态机框架:从理论到工程实践 Git 在国内网络环境下无法 Push 的排查与修复 —— 配置 Clash 代理 分段五次多项式插值原理详解 路径插值方法深度对比研究 Claude Code 使用指南 OpenClaw 记忆管理与技能创建指南 CBS(Conflict-Based Search)算法详解 A* 算法及其变种详解 OpenClaw 配置多 Agents Windows Powershell 无法加载文件,因为在此系统上禁止运行脚本问题的解决方案 MaxClaw 安装流程 大模型 AI 名词介绍 AList 网盘聚合工具简介 Protobuf 简介与测试 Claude Code 简介以及 GLM 4.7 模型接入 Github 歌词下载工具 163MusicLyrics Python __getattr__ 懒加载 Python TypedDict 机器人仿真平台 Gazebo 安装记录 机器人仿真平台 Gazebo 简介 多机器人路径规划问题(Multi-Agent Path Finding, MAPF)简介 Python exifread 读取修改过的 jpeg 信息错误问题修复 3D 坐标系变换的理解 3D 旋转矩阵基本概念 MongoDB Compass 介绍 Python 环境管理工具 uv Flutter 开发指南 Snipaste 安装下载与黑屏问题解决方案 全局路径规划算法记录 2025 Python 版本性能测试 Flutter Hello World Flutter 安装环境配置 Ubuntu VMware 硬盘扩容后 SMBus Host controller not enabled 报错问题解决 Python NetworkX 教程 Docker GPU 报错 - Failed to initialize NVML Unknown Error 解决方案 Python matplotlib 图表绘制 cuda-toolkit 安装替代 Cuda 与 Cudnn Jinja2 Python 利用 docxtpl 和 Jinja2 生成基于模板的 Word 文档 Docker 实现 CPU 核心隔离 LoFTR 基于 Transformer 的特征提取匹配算法 OmniGlue 特征匹配 SuperGlue 使用图神经网络学习特征匹配 Ubuntu 下将 xlsx 文件按照 sheet 转换为 图片 Python 使用 SQLAlchemy Python FastAPI 教程 openwrt 软路由配置安装 Nav2 地图文件(PGM/YAML)规范标准 3D OBJ 模型转换为 glb 瓦片格式 Python 源码 Redis 数据库介绍 Ubuntu 22.04 内核自动升级导致 MongoDB 7.0.12 错误记录 ubuntu 20.04 安装 ROS Noetic ubuntu 18.04 安装 ROS Melodic VMware Workstation Pro 个人免费版下载、安装、使用指南 Hybrid A-star 路径规划 Reeds-Shepp 曲线 Dubins 曲线 Linux kvm 虚拟机网络不通的问题解决方法 Ubuntu 自动内存清理 BiliBili 缓存视频转 mp4 Python 求解线性规划 3D Gaussian Splatting 官方源码实践记录 ImageMagick 教程 Ubuntu 22.04 安装 Colmap 对数几率 odds Ubuntu nmcli 网络管理工具使用指南 SuperPoint 自监督深度学习特征点提取 SyncTV Music Tag Web 在线音乐信息整理工具 ncm 格式转 mp3 MusicBrainz 音乐元数据百科数据库 Ubuntu 网络流量监控工具 私人云音乐平台 Navidrome 入门 手眼标定 四元数(Quaternions) OHTTPS 实现免费自动 https 证书申请、更新、部署 ubuntu 22.04 安装 CloudCompare 单机 KVM 虚拟机冷迁移 Ubuntu 22.04 使用 mdadm 实现软 raid 小鱼 一键安装 ROS-humble Fluid -46- 基于 Simpletex API 构建公式识别页面 公式识别 API 简介 -- Simpletex 使用 Python web 部署库 waitress 3D Gaussian Splatting for Real-Time Radiance Field Rendering Ubuntu Swap 简介与空间扩展 Ubuntu 24.04 安装 forticlient Clash Verge 使用 MongoDB 7.0.17 集群 Docker 构建源码 Error code - 2013. Lost connection to MySQL server during query 问题解决 Python 日志记录库 loguru 使用指北 Python 实现 Web 日志查看服务 MySQL LOAD DATA LOCAL INFILE 极速数据加载 Image size exceeds limit of 89478485 pixels 解决方案 Docker 使用 NVIDIA GPU 驱动错误解决 阿里云 docker 镜像仓库 Ubuntu中没有wired connected的解决方案 MinIO 简介 subconverter 代理订阅格式转换 修复 node –openssl-legacy-provider is not allowed in NODE_OPTIONS 错误
Python 异步执行 Asyncio
Yiwei Zhang · 2023-02-13 · via 又见苍岚

异步和高并发在一些服务器场景需求很多,从2013年起由 Python 之父 Guido 亲自操刀主持了Tulip(asyncio)项目的开发,使得 Python 具备了优雅的异步编程库。

简介

  • 之前介绍了使用 Python Threading 实现异步编程,使用的是多线程的思路,语法不够优雅而且对于简单的工作来说还是太重了。

  • 比较合适的程序运行单位叫做协程,协程能够在IO等待时间就去切换执行其他任务,当IO操作结束后再自动回调,那么就会大大节省资源并提供性能。

协程

协程(Coroutine),也可以被称为微线程,是一种用户态内的上下文切换技术。简而言之,其实就是通过一个线程实现代码块相互切换执行。

  • Asyncio 并不能带来真正的并行(parallelism)。当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)。

  • 在Python中有多种方式可以实现协程,例如:

  1. greenlet,是一个第三方模块,用于实现协程代码(Gevent协程就是基于greenlet实现);
  2. yield,生成器,借助生成器的特点也可以实现协程代码;
  3. asyncio,在 Python3.4 中引入的模块用于编写协程代码;
  4. async & awiat,在 Python3.5 中引入的两个关键字,结合 asyncio 模块可以更方便的编写协程代码。

目前python异步相关的主流技术是通过包含关键字async&await的async模块实现,因此我们重点关注 asyncio 模块。

asyncio

asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步IO的支持。

代码示例
  • 示例 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

@asyncio.coroutine
def func1():
print(1)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(2)

@asyncio.coroutine
def func2():
print(3)
yield from asyncio.sleep(2) # 遇到IO耗时操作,自动化切换到tasks中的其他任务
print(4)

tasks = [
asyncio.ensure_future( func1() ),
asyncio.ensure_future( func2() )
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

-->
1
3
2
4

  • 示例 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio

async def func1():
print(1)
await asyncio.sleep(2) # 耗时操作
print(2)

async def func2():
print(3)
await asyncio.sleep(2) # 耗时操作
print(4)

tasks = [
asyncio.ensure_future(func1()),
asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

-->
1
3
2
4

  • 示例 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# aiohttp 为支持异步编程的http请求库
import aiohttp
import asyncio

async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
content = await response.content.read()
file_name = url.rsplit('_')[-1]
with open(file_name, mode='wb') as file_object:
file_object.write(content)

async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://www.1.jpg',
'https://www.2.jpg',
'https://www.3.jpg'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)

if __name__ == '__main__':
asyncio.run(main())

输出:一次发送三个下载请求,同时下载,假如每次下载花费1s,完成任务仅需要1s 左右

async 关键字

async & await关键字在 Python 3.5 版本中正式引入,代替了asyncio.coroutine 装饰器,基于他编写的协程代码其实就是上一示例的加强版,让代码可以更加简便可读。

  • 协程函数:定义函数时候由async关键字装饰的函数 async def 函数名 (定义协程)
  • 协程对象:执行协程函数得到的协程对象。
1
2
3
4
5
# 协程函数
async def func():
pass
# 协程对象
result = func()

注意:执行协程函数只会创建协程对象,函数内部代码不会执行。如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理。

1
2
3
4
5
6
7
8
9
10
11
import asyncio 
async def func():
print("执行协程函数内部代码!")
result = func()

# 调用方法1:
# loop = asyncio.get_event_loop()
# loop.run_until_complete( result )

# 调用方法2:
asyncio.run( result )

  • 协程能做的事情
    • 等待一个 future 结束
    • 等待另一个协程(产生一个结果,或引发一个异常)
    • 产生一个结果给正在等它的协程
    • 引发一个异常给正在等它的协程

await 关键字

await + 可等待的对象(协程对象、Future、Task对象 -> IO等待),遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio

async def others():
print("start")
await asyncio.sleep(2)
print('end')
return '返回值'

async def func():
print("执行协程函数内部代码")
# await等待对象的值得到结果之后再继续向下走
response = await others()
print("IO请求结束,结果为:", response)

asyncio.run( func() )

task 对象

Task 对象的作用是在事件循环中添加多个任务,用于并发调度协程,通过asyncio.create_task(协程对象)的方式创建Task对象,这样可以让协程加入事件循环中等待被调度执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async def module_a():
print("start module_a")
await asyncio.sleep(2) # 模拟 module_a 的io操作
print('end module_a')
return 'module_a 完成'

async def module_b():
print("start module_b")
await asyncio.sleep(1) # 模拟 module_a 的io操作
print('end module_b')
return 'module_b 完成'

task_list = [
module_a(),
module_b(),
]

done,pending = asyncio.run( asyncio.wait(task_list) )
print(done)

future 回调

假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。

1
2
3
4
5
6
7
def done_callback(futu):
print('Done')

futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多个协程

实际项目中,往往有多个协程,同时在一个 loop 里运行。为了把多个协程交给 loop,需要借助 asyncio.gather 函数。

1
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

官方文档:

1
2
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return 各协程协成的合并结果

或者先把协程存在列表里:

1
2
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

也可以传 futures 给它:

1
2
3
4
5
6
7
8
9
futus = [asyncio.ensure_future(do_some_work(1)),
asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus))

如果进程中已经有了loop, 则可以直接等待这几个异步的结果
results = await asyncio.gather(*futus)
返回结果results是一个列表,每个元素是每个异步的返回内容
print(str(results))

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

Close Loop

以上示例都没有调用 loop.close,好像也没有什么问题。所以到底要不要调 loop.close 呢?
简单来说,loop 只要不关闭,就还可以再运行。:

1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

但是如果关闭了,就不能再运行了:

1
2
3
loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3)) # 此处异常

建议调用 loop.close,以彻底清理 loop 对象防止误用。

多进程+asyncio

由于python本身只能单线程,所以所谓的线程是通过线程锁实现的。现在必须要通过多进程实现更多的并发。

现在demo实现了使用多进程,每个进程都有一个asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
import threading
import multiprocessing
from multiprocessing import Queue ,Pool,Process
#import aiohttp
import os

async def hello(name):
print('hello {} {}**********{}'.format(name,os.getpid(),threading.current_thread()))
#await asyncio.sleep(int(name))
await asyncio.sleep(1)
print('end:{} {}'.format(name,os.getpid()))

def process_start(*namelist):
tasks=[]
loop=asyncio.get_event_loop()
for name in namelist:
tasks.append(asyncio.ensure_future(hello(name)))
loop.run_until_complete(asyncio.wait(tasks))

def task_start(namelist):
i=0
lst=[]
flag=10
while namelist:
i+=1
l=namelist.pop()
lst.append(l)
if i==flag:
p=Process(target=process_start,args=lst)
p.start()
#p.join()
lst=[]
i=0
if namelist!=[]:
p=Process(target=process_start,args=lst)
p.start()
#p.join()

if __name__=='__main__':
# 测试使用多个进程来实现函数
namelist=list('0123456789'*10)
print(namelist)
task_start(namelist)

# 测试使用一个异步io来实现全部函数
# loop=asyncio.get_event_loop()
# tasks=[]
# namelist=list('0123456789'*10)
# for i in namelist:
# tasks.append(asyncio.ensure_future(hello(i)))
# loop.run_until_complete(asyncio.wait(tasks))

gather 起聚合的作用,把多个 futures 包装成单个 future,因为 loop.run_until_complete 只接受单个 future。

参考资料

文章链接:
https://www.zywvvd.com/notes/coding/python/asyncio/asyncio/