惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
人人都是产品经理
人人都是产品经理
Cisco Talos Blog
Cisco Talos Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
V
V2EX
博客园 - 三生石上(FineUI控件)
Martin Fowler
Martin Fowler
WordPress大学
WordPress大学
D
Docker
S
SegmentFault 最新的问题
博客园 - 聂微东
美团技术团队
Apple Machine Learning Research
Apple Machine Learning Research
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Last Week in AI
Last Week in AI
M
MIT News - Artificial intelligence
F
Fortinet All Blogs
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
The GitHub Blog
The GitHub Blog
GbyAI
GbyAI
L
LangChain Blog
Vercel News
Vercel News
博客园 - 叶小钗
MongoDB | Blog
MongoDB | Blog
Stack Overflow Blog
Stack Overflow Blog
H
Help Net Security
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
The Cloudflare Blog
Engineering at Meta
Engineering at Meta
T
Threat Research - Cisco Blogs
T
Threatpost
Scott Helme
Scott Helme
T
Tailwind CSS Blog
Latest news
Latest news
Stack Overflow Blog
Stack Overflow Blog
Blog — PlanetScale
Blog — PlanetScale
The Register - Security
The Register - Security
罗磊的独立博客
P
Proofpoint News Feed
腾讯CDC
S
Schneier on Security
雷峰网
雷峰网
A
About on SuperTechFans
T
Tenable Blog
F
Full Disclosure
Cyberwarzone
Cyberwarzone
博客园_首页
有赞技术团队
有赞技术团队
K
Kaspersky official blog

文章列表

不改一行插件代码,实现消息优先级与阻断 洛玖定时任务系统 在 butterfly 主题中添加首页点集动画(基于p2line项目) 你好,2026 stm32f4xx-ads1256驱动 stm32f4xx-ad9854并行驱动 主动式网站状态监测实现及其应用 右键菜单加入用Trae打开文件和文件夹 三角洲行动ID映射表 洛玖SDK说明 为网页文章开头添加原文连接 Hexo-Butterfly主题在主页添加GitHub贡献日历 Proteus中555定时器仿真问题 洛玖开发日记 STS3032舵机获取力矩输出 kotlin网页前后端那些事 mspm0g3507-ad9850 奇怪的bug Paddle模型转PaddleLite 人工智能考核 构建一个yolov3网络
装饰器
洛屿 · 2025-03-06 · via

Python

Timeout装饰器

timeout装饰器实现了超时回调的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import asyncio

class Timeout:
def __init__(self, wait, on_timeout):
self.wait = wait
self.on_timeout = on_timeout
self.timer = None

def __call__(self, fn):
def wrapped(*args, **kwargs):
# 执行原函数
result = fn(*args, **kwargs)
# 取消之前的定时器
if self.timer is not None:
self.timer.cancel()
# 创建新的定时器
self.timer = threading.Timer(self.wait, self._on_timer)
self.timer.start()
return result
return wrapped

def _on_timer(self):
asyncio.run(self.on_timeout())

应用样例(active_message):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
async def call_back():
global message_package
message, group_id, user_id = message_package['message'], message_package['group_id'], message_package['user_id']
message_package = {
"message": "",
"group_id": "",
"user_id": ""
}
await message_reply(message, group_id, user_id)

@Timeout(wait=10, on_timeout=call_back)
async def active_message(message, group_id, user_id):
global message_package
message_package['message'] += f"{message}\n"
message_package['group_id'] = str(group_id)
message_package['user_id'] = str(user_id)

async def group_handle(message, group_id, user_id):
if message == "开启对话":
return await start_conversation(group_id, user_id)
elif message == "停止对话":
return await stop_conversation(group_id, user_id)
elif message == "遗忘对话":
return await forget_conversation(group_id, user_id)
elif message == "重启对话":
return await restart_conversation(group_id, user_id)
elif user_id in active_conversations:
await active_message(message, group_id, user_id)
else:
return "对话未开启,请输入'开启对话'以开始聊天。"

Record装饰器

record装饰器实现了记录消息到db数据库的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import sqlite3
import os
import stat
import platform
from config import get_value
value = get_value()

def Record(func):
# __group_message_record__
async def __group_message_record__(message_objects):
if message_objects['message_type'] == 'group':
message_time = message_objects['time']
message = message_objects['message']
group_id = message_objects['group_id']
user_id = message_objects['user_id']

# 群文件夹
data_path = value.data_path + '/{group_id}/'.format(group_id=group_id)
group_path = data_path
if not os.path.exists(data_path):
os.makedirs(data_path)
if platform.system() != 'Windows':
os.chmod(data_path, stat.S_IRWXO)

data_path = group_path + '/chat_record.db'
record_file = data_path
if not os.path.isfile(data_path):
conn = sqlite3.connect(data_path)
cursor = conn.cursor()

cursor.execute('''
CREATE TABLE IF NOT EXISTS record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
qq_id TEXT NOT NULL,
content TEXT NOT NULL,
time TEXT NOT NULL
)
''')

conn.commit()
conn.close()
else:
conn = sqlite3.connect(record_file)
cursor = conn.cursor()

cursor.execute('''
INSERT INTO record (qq_id, content, time)
VALUES (?, ?, ?)
''', (user_id, message, message_time))

conn.commit()
conn.close()
if message_objects['message_type'] == 'private':
print("私聊消息")
await func(message_objects)
return __group_message_record__

message_objects消息格式样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{
"self_id": "BOT_ID",
"user_id": "USER_ID",
"time": 1730539514,
"message_id": 1756106583,
"message_seq": 1756106583,
"real_id": 1756106583,
"message_type": "group",
"sender": {
"user_id": "USER_ID",
"nickname": "用户A",
"card": "洛",
"role": "owner"
},
"raw_message": "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]",
"font": 14,
"sub_type": "normal",
"message": "[CQ:at,qq=USER_ID] 你好世界[CQ:face,id=63]",
"message_format": "string",
"post_type": "message",
"group_id": GROUP_ID
}

应用样例:

1
2
3
4
5
6
7
8
9
10
@Record
async def message_handle(message_objects):
if message_objects['message_type'] == 'group':
message = message_objects['message']
group_id = message_objects['group_id']
user_id = message_objects['user_id']

await plugin_manager.handle_group_message(message, group_id, user_id)
if message_objects['message_type'] == 'private':
pass

Driver装饰器

driver装饰器用于插件启动前,停止前函数调用声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from typing import Callable, Coroutine, Any, List

class Driver:
def __init__(self):
self._startup_callbacks: List[Callable[[], Coroutine[Any, Any, None]]] = []
self._shutdown_callbacks: List[Callable[[], Coroutine[Any, Any, None]]] = []

def on_startup(self, func: Callable[[], Coroutine[Any, Any, None]]):
self._startup_callbacks.append(func)
return func

def on_shutdown(self, func: Callable[[], Coroutine[Any, Any, None]]):
self._shutdown_callbacks.append(func)
return func

async def run_startup(self):
for callback in self._startup_callbacks:
await callback()

async def run_shutdown(self):
for callback in self._shutdown_callbacks:
await callback()

driver = Driver()

调用样例:

1
2
3
4
5
6
7
8
9
10
11
12
from luo9 import get_driver

driver = get_driver()

@driver.on_startup
async def _():
print("插件启动前处理")

@driver.on_shutdown
async def _():
print("插件停止前处理")

被on_startup标记的函数会在bot启动前(插件载入阶段)进行调用

被on_shutdown标记的函数会在bot停止前(插件停止阶段)进行调用

版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 洛屿的小站

打赏

  • wechat

    wechat

  • alipay

    alipay