惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

爱范儿
爱范儿
博客园_首页
W
WeLiveSecurity
S
Secure Thoughts
S
Security @ Cisco Blogs
Recent Commits to openclaw:main
Recent Commits to openclaw:main
Hugging Face - Blog
Hugging Face - Blog
www.infosecurity-magazine.com
www.infosecurity-magazine.com
H
Hacker News: Front Page
Project Zero
Project Zero
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
U
Unit 42
N
News and Events Feed by Topic
N
News and Events Feed by Topic
Hacker News - Newest:
Hacker News - Newest: "LLM"
Forbes - Security
Forbes - Security
T
Tor Project blog
I
Intezer
B
Blog
F
Full Disclosure
Security Archives - TechRepublic
Security Archives - TechRepublic
F
Fortinet All Blogs
Schneier on Security
Schneier on Security
T
Threat Research - Cisco Blogs
AI
AI
Google DeepMind News
Google DeepMind News
L
LINUX DO - 最新话题
Cloudbric
Cloudbric
L
Lohrmann on Cybersecurity
WordPress大学
WordPress大学
博客园 - 聂微东
雷峰网
雷峰网
P
Privacy International News Feed
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
PCI Perspectives
PCI Perspectives
Y
Y Combinator Blog
Spread Privacy
Spread Privacy
Simon Willison's Weblog
Simon Willison's Weblog
罗磊的独立博客
Vercel News
Vercel News
A
Arctic Wolf
The Register - Security
The Register - Security
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
Microsoft Azure Blog
Microsoft Azure Blog
H
Heimdal Security Blog
Know Your Adversary
Know Your Adversary
P
Proofpoint News Feed
C
Cybersecurity and Infrastructure Security Agency CISA
P
Proofpoint News Feed

WebSocket

如果有多个 websocket 业务需求,一般情况下后端是建立一个还是多个 websocket 更好? - V2EX webSocket 在项目运动过程中 可以时不时重新连接吗? - V2EX websocket 下载文件的原理是啥? - V2EX websocket 如何投递消息到 http - V2EX 麻烦问个 Java websocket 的性能优化问题(比较基础) - V2EX 请教一下 websocket 实现发图片的方案和保存聊天记录的方案; - V2EX 问些 websocket 的问题 - V2EX websocket 的 ping/pong 标准是需要服务器主动发 ping? - V2EX [有偿] 原生 app 开发即时通讯 关于 websocket 长链接的问题 - V2EX WebSocket 频繁爆出异常 - V2EX 问一个 websockets 的问题 - V2EX Java 中 websocket 中 session 长久保持打开状态 - V2EX 使用 gevent 作为 Websocket 服务端,浏览器上用 setInterval 定时重复建立 Websocket,结果导致 gevent 不会自动释放之前废弃的连接,内存快要溢出了。怎么自动回收废弃连接呢 - V2EX websocket 在 springsecurity 下结合 jwt 的问题 - V2EX sync-player:使用 websocket 实现异地同步播放视频 - V2EX Websocket 直播间聊天室教程 - GoEasy 快速实现聊天室 - V2EX Uniapp 使用 GoEasy 实现 websocket 实时通讯 - V2EX 搭建 websocket 消息推送服务,必须要考虑的几个问题 - V2EX 最近在看 websocket 的聊天室功能,有个小疑问 - V2EX
请教, websockets 模块起服务, websockets.serve 的方法问题。 - V2EX
qazwsxkevin · 2023-10-23 · via WebSocket
import datetime
import random
import string
import asyncio
import time

import websockets

from multiprocessing import Manager
from concurrent import futures

# 忽略警告
import warnings
warnings.filterwarnings("ignore")

strLen = 30

def putmsg(que, gvar):
    cc = 0
    while True:
        cc += 1

        if gvar['flag'] == True:
            break

        ranStr = ''
        for s in range(strLen):
            ranStr = ranStr + random.choice(string.ascii_letters + string.digits)

        # slTime = random.uniform(0.01,0.2)

        logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecond=0)) + ' ' + ranStr
        # print(logStr)
		
        # test
        # print(cc, '#', que.qsize())

        que.put(logStr)

        # time.sleep(slTime)
        time.sleep(1.5)

def wsock(queu, gvar):
    loop = asyncio.get_event_loop()
    async def stoploop():
        loop.stop()

    # Maintain a list of connected clients
    connected_clients = set()

    async def register(websocket):
        # Add a new client to the list
        connected_clients.add(websocket)
        print('connected_clients.add(websocket)')

    async def unregister(websocket):
        # Remove a client from the list
        connected_clients.remove(websocket)
        print('connected_clients.remove(websocket)')

    async def broadcast(message):
        # Send a message to all connected clients
        if connected_clients:
            await asyncio.gather(*(client.send(message) for client in connected_clients))

    async def echo(websocket, que=queu):
        await register(websocket)

        while True:
            if queu.qsize():
                msgStr = queu.get()
                if connected_clients:
                    try:
                        await asyncio.gather(*(client.send(msgStr) for client in connected_clients))
                    except Exception as e:
                        print(e) # echo:received 1001 (going away); then sent 1001 (going away)
                        break
            else:
                asyncio.sleep(0.3)


        async for message in websocket:
            # Broadcast the received message to all clients
            if message == 'stop':
                gvar['flag'] = True
                await stoploop()
            await broadcast(message)

        await unregister(websocket)


    start_server = websockets.serve(echo, "172.17.0.2", 25299)

    asyncio.set_event_loop(loop)
    # loop.create_task(start_server)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

if __name__ == '__main__':
    # 队列
    msgQue = Manager().Queue()
    # 全局变量
    glovar = Manager().dict()

    # 启停开关
    glovar['flag'] = False

    # 处理进程
    proc = futures.ProcessPoolExecutor(max_workers=2)

    wsockRet = proc.submit(wsock, msgQue, glovar)
    putmsgRet = proc.submit(putmsg, msgQue, glovar)

问题是 websocket.serve 使用 echo 方法作为 handle ,
只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)

echo 的被动方法,队列里的日志越来越多,
想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
请教大家这里怎么换个方式实现呢?