惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

T
The Exploit Database - CXSecurity.com
A
Arctic Wolf
K
Kaspersky official blog
T
Threat Research - Cisco Blogs
PCI Perspectives
PCI Perspectives
www.infosecurity-magazine.com
www.infosecurity-magazine.com
P
Privacy International News Feed
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
U
Unit 42
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
Simon Willison's Weblog
Simon Willison's Weblog
P
Privacy & Cybersecurity Law Blog
O
OpenAI News
量子位
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
C
Cisco Blogs
AWS News Blog
AWS News Blog
Vercel News
Vercel News
Microsoft Security Blog
Microsoft Security Blog
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
美团技术团队
T
Threatpost
S
Schneier on Security
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
C
Cyber Attacks, Cyber Crime and Cyber Security
Last Week in AI
Last Week in AI
C
CERT Recently Published Vulnerability Notes
Blog — PlanetScale
Blog — PlanetScale
C
Cybersecurity and Infrastructure Security Agency CISA
F
Full Disclosure
博客园_首页
N
Netflix TechBlog - Medium
Security Latest
Security Latest
有赞技术团队
有赞技术团队
Google DeepMind News
Google DeepMind News
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
The Register - Security
The Register - Security
Application and Cybersecurity Blog
Application and Cybersecurity Blog
Recent Announcements
Recent Announcements
博客园 - Franky
P
Palo Alto Networks Blog
Project Zero
Project Zero
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
H
Help Net Security
Hacker News: Ask HN
Hacker News: Ask HN
Cisco Talos Blog
Cisco Talos Blog
H
Heimdal Security Blog
The Hacker News
The Hacker News
博客园 - 【当耐特】
GbyAI
GbyAI

博客园 - 雨V幕

使用chromedp 来做人工模拟操作爬取数据方法 遍历redis按照前缀给未设置过期时间的数据添加过期时间 使用trace进行排查网络瓶颈 使用vscode 调试 Python 使用power shell 拆分 csv文件 将大文件拆分成小文件。 使用postman 添加预处理验签。 go 使用pprof 进行问题排查 Mysql无主键删除重复数据的快速方法 解决mysql 事务死锁的方法 go在处理批量下载时候出现fatal error: runtime: out of memory AnalyticDB 创建db go 序列化反序列化之后时区信息丢失 clickhouse 进行建表期间的一些优化 kraots2.0 在windows 环境搭建开发环境 Sql Server使用函数获取拼音码 关于async 和await关键字 使用kubespray 一键部署 containerd 的安装和熟悉 VMware 配置双网卡实现上网和固定ip
使用rabbitmq 进行任务调度
雨V幕 · 2025-10-21 · via 博客园 - 雨V幕
package mq

import (
    "context"
    "fmt"
    "log"
    "sync"
    "sync/atomic"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

type Client struct {
    url      string
    isAliyun bool

    conn *amqp.Connection
    ch   *amqp.Channel // admin/publisher channel

    mu sync.Mutex

    exchanges []exchangeDecl
    consumers []consumerDecl

    // reconnect 控制改用原子标志 + 通道用于通知
    reconnecting int32
    closed       chan struct{}
    wg           sync.WaitGroup
}

type exchangeDecl struct {
    Exchange   string
    Queue      string
    RoutingKey string
}

type consumerDecl struct {
    Queue   string
    Handler func(amqp.Delivery)
}

// ------------------ 辅助函数 (用于 Ack/Nack 和 Panic 保护) ------------------

// safeAck 尝试 Ack,如果失败,返回错误(指示 Channel 可能已关闭)
func safeAck(msg amqp.Delivery) error {
    // 在某些情况下,msg.Ack 会在 channel 已关闭时返回一个错误
    if err := msg.Ack(false); err != nil {
        // 记录错误,但不panic
        log.Printf("[RabbitMQ] safeAck 失败: %v", err)
        return err // 返回错误,让调用者决定是否退出
    }
    return nil
}

// safeNack 尝试 Nack,如果失败,返回错误(指示 Channel 可能已关闭)
func safeNack(msg amqp.Delivery, requeue bool) error {
    // 在某些情况下,msg.Nack 会在 channel 已关闭时返回一个错误
    if err := msg.Nack(false, requeue); err != nil {
        log.Printf("[RabbitMQ] safeNack 失败: %v", err)
        return err // 返回错误,让调用者决定是否退出
    }
    return nil
}

// safeHandle 用于捕获 handler 的 panic 并把错误返回
func safeHandle(handler func(amqp.Delivery), msg amqp.Delivery) (err error) {
    defer func() {
        if r := recover(); r != nil {
            err = fmt.Errorf("handler panic: %v", r)
        }
    }()
    handler(msg)
    return nil
}

// ------------------ 初始化 ------------------

func New(url string, isAliyun bool) *Client {
    return &Client{
        url:       url,
        isAliyun:  isAliyun,
        closed:    make(chan struct{}),
        exchanges: make([]exchangeDecl, 0),
        consumers: make([]consumerDecl, 0),
    }
}

func (c *Client) Connect() error {
    c.mu.Lock()
    defer c.mu.Unlock()

    var conn *amqp.Connection
    var err error

    // **健壮性增强 1:配置心跳 (Heartbeat)**
    // 设置心跳为 20 秒,以确保在网络中间件的空闲超时(通常 > 60s)之前发送活动信号。
    //config := amqp.Config{
    //    Heartbeat: 20 * time.Second,
    //}

    // 尝试连接
    for i := 0; i < 5; i++ {
        // 使用 DialConfig
        conn, err = amqp.Dial(c.url)
        time.Sleep(2 * time.Second)
        if err == nil {
            break
        }
        log.Printf("[RabbitMQ] connect failed (%d/5): %v\n", i+1, err)
        time.Sleep(2 * time.Second)
    }
    if err != nil {
        return fmt.Errorf("RabbitMQ 连接失败: %v", err)
    }

    ch, err := conn.Channel()
    if err != nil {
        _ = conn.Close()
        return fmt.Errorf("RabbitMQ 打开通道失败: %v", err)
    }

    c.conn = conn
    c.ch = ch

    log.Println("[RabbitMQ] 连接成功:", c.url)

    // 启动 watcher(只启动一次)
    go c.watchConnection()

    // 恢复之前声明(若有)
    for _, e := range c.exchanges {
        if err := c.declareExchangeQueueBind(e); err != nil {
            log.Printf("[RabbitMQ] 恢复 Exchange/Queue 失败: %v", err)
        }
    }

    // 恢复之前注册的消费者
    for _, cs := range c.consumers {
        go func(q string, h func(amqp.Delivery)) {
            if err := c.startConsume(q, h); err != nil {
                log.Printf("[RabbitMQ] 恢复 Consumer %s 初次失败: %v", q, err)
            }
        }(cs.Queue, cs.Handler)
    }

    return nil
}

// ------------------ Watcher / Reconnect ------------------

// watchConnection 只监听连接级别关闭,触发全局重连流程。
func (c *Client) watchConnection() {
    for {
        c.mu.Lock()
        conn := c.conn
        ch := c.ch
        c.mu.Unlock()

        if conn == nil {
            select {
            case <-time.After(2 * time.Second):
            case <-c.closed:
                return
            }
            continue
        }

        connClose := conn.NotifyClose(make(chan *amqp.Error, 1))
        chClose := ch.NotifyClose(make(chan *amqp.Error, 1))

        select {
        case err := <-connClose:
            if err != nil {
                log.Printf("[RabbitMQ] 连接关闭: %v", err)
                c.handleReconnect()
            }
        case err := <-chClose:
            if err != nil {
                log.Printf("[RabbitMQ] admin 通道关闭: %v", err)
                c.handleReconnect()
            }
        case <-c.closed:
            return
        }
    }
}

func (c *Client) handleReconnect() {
    if !atomic.CompareAndSwapInt32(&c.reconnecting, 0, 1) {
        log.Println("[RabbitMQ] 重连已在进行中,忽略本次触发")
        return
    }

    go func() {
        defer atomic.StoreInt32(&c.reconnecting, 0)

        // 先安全关闭旧 conn/ch
        c.mu.Lock()
        if c.ch != nil {
            _ = c.ch.Close()
            c.ch = nil
        }
        if c.conn != nil {
            _ = c.conn.Close()
            c.conn = nil
        }
        c.mu.Unlock()

        // 指数退避重连
        backoff := time.Second * 2
        for {
            select {
            case <-c.closed:
                return
            default:
            }

            log.Println("[RabbitMQ] 尝试重新连接中...")
            if err := c.Connect(); err != nil {
                log.Printf("[RabbitMQ] 重连失败: %v,%s 后重试", err, backoff)
                time.Sleep(backoff)
                // 指数回退到最大 30s
                if backoff < 30*time.Second {
                    backoff *= 2
                }
                continue
            }
            log.Println("[RabbitMQ] 重连成功并恢复所有消费者")
            return
        }
    }()
}

// ------------------ 声明 Exchange/Queue ------------------

// Declare 会在 admin channel 上声明并保存声明记录(用于重连恢复)
func (c *Client) Declare(exchange, queue, routingKey string) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.ch == nil || c.ch.IsClosed() {
        return fmt.Errorf("声明失败: admin channel 未打开")
    }

    kind := "x-delayed-message"
    args := amqp.Table{"x-delayed-type": "direct"}
    if c.isAliyun {
        kind = "direct"
        args = nil
    }

    if err := c.ch.ExchangeDeclare(
        exchange,
        kind,
        true, false, false, false, // durable: true
        args,
    ); err != nil {
        return fmt.Errorf("声明 Exchange 失败: %v", err)
    }

    if _, err := c.ch.QueueDeclare(
        queue,
        true, false, false, false, // durable: true
        nil,
    ); err != nil {
        return fmt.Errorf("声明 Queue 失败: %v", err)
    }

    if err := c.ch.QueueBind(queue, routingKey, exchange, false, nil); err != nil {
        return fmt.Errorf("绑定失败: %v", err)
    }

    // 保存声明用于重连恢复
    found := false
    for _, e := range c.exchanges {
        if e.Exchange == exchange && e.Queue == queue && e.RoutingKey == routingKey {
            found = true
            break
        }
    }
    if !found {
        c.exchanges = append(c.exchanges, exchangeDecl{
            Exchange:   exchange,
            Queue:      queue,
            RoutingKey: routingKey,
        })
    }

    return nil
}

func (c *Client) declareExchangeQueueBind(e exchangeDecl) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.ch == nil || c.ch.IsClosed() {
        return fmt.Errorf("admin channel 未打开")
    }

    kind := "x-delayed-message"
    args := amqp.Table{"x-delayed-type": "direct"}
    if c.isAliyun {
        kind = "direct"
        args = nil
    }

    if err := c.ch.ExchangeDeclare(e.Exchange, kind, true, false, false, false, args); err != nil {
        return err
    }
    if _, err := c.ch.QueueDeclare(e.Queue, true, false, false, false, nil); err != nil {
        return err
    }
    if err := c.ch.QueueBind(e.Queue, e.RoutingKey, e.Exchange, false, nil); err != nil {
        return err
    }
    return nil
}

// ------------------ 发布消息 ------------------

func (c *Client) Publish(exchange, routingKey string, body []byte, correlationId string, delayMs int64) error {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    headers := amqp.Table{}
    if delayMs > 0 {
        headers["x-delay"] = int32(delayMs)
    }

    c.mu.Lock()
    ch := c.ch
    c.mu.Unlock()

    if ch == nil || ch.IsClosed() {
        return fmt.Errorf("Publish 失败: admin channel 已关闭")
    }

    // 生产健壮性:此处建议为 ch 开启 Publisher Confirms,并在 PublishWithContext 后等待确认。
    // 为了保持代码简洁,这里暂时沿用原逻辑。
    return ch.PublishWithContext(ctx,
        exchange,
        routingKey,
        false, false,
        amqp.Publishing{
            ContentType:   "application/json",
            Body:          body,
            Headers:       headers,
            CorrelationId: correlationId,
            DeliveryMode:  amqp.Persistent, // 建议设置为持久化
        },
    )
}

// ------------------ 消费者 ------------------

// Consume 注册消费者(保存 handler),并尝试启动消费者。
func (c *Client) Consume(queue string, handler func(amqp.Delivery)) error {
    // 保存 consumer 定义(避免重复)
    c.mu.Lock()
    found := false
    for _, cc := range c.consumers {
        if cc.Queue == queue {
            found = true
            break
        }
    }
    if !found {
        c.consumers = append(c.consumers, consumerDecl{Queue: queue, Handler: handler})
    }
    c.mu.Unlock()

    // 尝试启动消费者
    return c.startConsume(queue, handler)
}

func (c *Client) startConsume(queue string, handler func(amqp.Delivery)) error {
    // 创建独立 channel
    c.mu.Lock()
    conn := c.conn
    c.mu.Unlock()

    if conn == nil {
        return fmt.Errorf("startConsume 失败: connection 未就绪")
    }

    ch, err := conn.Channel()
    if err != nil {
        return fmt.Errorf("创建 consumer channel 失败: %v", err)
    }

    // 设置 QoS,避免一次性推太多未 ack 消息(可根据业务调整)
    if err := ch.Qos(3, 0, false); err != nil {
        log.Printf("[RabbitMQ] 设置 Qos 失败: %v", err)
    }

    consumerTag := fmt.Sprintf("%s-%d", queue, time.Now().UnixNano())

    msgs, err := ch.Consume(queue, consumerTag, false, false, false, false, nil)
    if err != nil {
        _ = ch.Close()
        return fmt.Errorf("启动消费者失败: %v", err)
    }

    // 监听该 channel 的 NotifyClose,以便自动重连此消费者
    closeCh := ch.NotifyClose(make(chan *amqp.Error, 1))

    c.wg.Add(1)
    go func() {
        defer func() {
            // 一定关闭 channel 并释放 wg
            _ = ch.Close()
            c.wg.Done()
            log.Printf("[RabbitMQ] 消费者 %s 已退出 goroutine", queue) // 此日志是预期行为
        }()

        for {
            select {
            case msg, ok := <-msgs:
                if !ok {
                    // msgs channel 被关闭,可能是由于底层 Channel 已经关闭
                    log.Printf("[RabbitMQ] 消费者 %s: msgs channel closed, will attempt restart", queue)
                    go c.retryStartConsume(queue, handler)
                    return
                }

                // 标记是否需要主动退出
                shouldExit := false

                // 处理消息(包含 panic 保护)
                func(m amqp.Delivery) {
                    // defer 用于捕获 panic 并尝试 Nack
                    defer func() {
                        if r := recover(); r != nil {
                            log.Printf("[RabbitMQ] Handler panic: %v, msg: %s", r, string(m.Body))
                            // 尽量 Nack,如果 Nack 失败,则设置 shouldExit
                            if safeNack(m, true) != nil {
                                shouldExit = true
                            }
                        }
                    }()

                    if err := safeHandle(handler, m); err != nil {
                        // Handler 返回错误
                        var nackErr error
                        if m.Redelivered {
                            nackErr = safeNack(m, false) // 拒绝,不重入 (可能进死信)
                        } else {
                            nackErr = safeNack(m, true) // Nack 并重入
                        }

                        if nackErr != nil {
                            // Nack 失败,说明 Channel 可能已关闭
                            shouldExit = true
                        }
                        return
                    }

                    // 成功处理
                    if err := safeAck(m); err != nil {
                        // **健壮性增强 2:Ack 失败,主动退出**
                        // Ack 失败,说明 Channel 可能已关闭
                        shouldExit = true
                    }
                }(msg)

                // **健壮性增强 3:如果 Ack/Nack 失败,立即退出 Goroutine**
                if shouldExit {
                    log.Printf("[RabbitMQ] 消费者 %s: Ack/Nack 失败,主动触发重启", queue)
                    go c.retryStartConsume(queue, handler)
                    return
                }

            case err := <-closeCh:
                // Channel 收到关闭通知(err != nil 是外部故障)
                if err != nil {
                    log.Printf("[RabbitMQ] consumer channel closed for queue %s: %v", queue, err)
                    go c.retryStartConsume(queue, handler)
                    return
                }
                // err == nil 是本地关闭,仍然触发重试/恢复
                log.Printf("[RabbitMQ] consumer channel closed (nil) for queue %s, attempting restart", queue)
                go c.retryStartConsume(queue, handler)
                return

            case <-c.closed:
                return
            }
        }
    }()

    return nil
}

// retryStartConsume 会循环尝试重启特定 queue 的消费者,直到成功或 client 被关闭
func (c *Client) retryStartConsume(queue string, handler func(amqp.Delivery)) {
    backoff := time.Second * 2
    for {
        select {
        case <-c.closed:
            return
        default:
        }

        // 如果全局在重连中,等待重连完成
        if atomic.LoadInt32(&c.reconnecting) == 1 {
            time.Sleep(2 * time.Second)
            continue
        }

        if err := c.startConsume(queue, handler); err != nil {
            log.Printf("[RabbitMQ] 重启消费者 %s 失败: %v, %s 后重试", queue, err, backoff)
            time.Sleep(backoff)
            if backoff < 30*time.Second {
                backoff *= 2
            }
            continue
        }
        log.Printf("[RabbitMQ] 消费者 %s 重启成功", queue)
        return
    }
}

// ------------------ 关闭 ------------------

func (c *Client) Close() {
    close(c.closed)

    c.mu.Lock()
    if c.ch != nil {
        _ = c.ch.Close()
        c.ch = nil
    }
    if c.conn != nil {
        _ = c.conn.Close()
        c.conn = nil
    }
    c.mu.Unlock()

    // 等待所有消费者 goroutine 退出
    c.wg.Wait()
    log.Println("[RabbitMQ] 客户端已关闭")
}
type MQBaseConfig struct {
    URL        string
    UserName   string
    Password   string
    IsAliyun   bool
    Ak         string
    Sk         string
    EndPoint   string
    InstanceId string
    VHost      string
}

var mqSetting *MQBaseConfig

func init() {
    var ctx = gctx.New()
    mqSetting = &MQBaseConfig{
        URL:        g.Cfg().MustGet(ctx, "rabbitMq.url").String(),
        UserName:   g.Cfg().MustGet(ctx, "rabbitMq.userName").String(),
        Password:   g.Cfg().MustGet(ctx, "rabbitMq.password").String(),
        IsAliyun:   g.Cfg().MustGet(ctx, "rabbitMq.isAliyun").Bool(),
        Ak:         g.Cfg().MustGet(ctx, "rabbitMq.AK").String(),
        Sk:         g.Cfg().MustGet(ctx, "rabbitMq.SK").String(),
        EndPoint:   g.Cfg().MustGet(ctx, "rabbitMq.endpoint").String(),
        InstanceId: g.Cfg().MustGet(ctx, "rabbitMq.instanceId").String(),
        VHost:      g.Cfg().MustGet(ctx, "rabbitMq.vHost").String(),
    }

    connStr := buildConnURL(mqSetting)

    //client := New("amqp://user:pass@localhost:5672/", mqSetting.IsAliyun)

    client := mq.New(connStr, mqSetting.IsAliyun)

    if err := client.Connect(); err != nil {
        log.Fatalf("连接失败: %v", err)
    }

    // 声明 Exchange / Queue
    client.Declare(
        rabbitmqlib.AD_PLAN_EXCHANGE,
        rabbitmqlib.AD_PLAN_QUEUE,
        rabbitmqlib.AD_PLAN_KEY,
    )

    // ---------------------------
    //   创建 KS 任务延迟队列
    // ---------------------------
    client.Declare(
        rabbitmqlib.KS_TASK_EXCHANGE,
        rabbitmqlib.KS_TASK_QUEUE,
        rabbitmqlib.KS_TASK_KEY,
    )

    client.Declare(
        rabbitmqlib.KS_UPLOAD_EXCHANGE,
        rabbitmqlib.KS_UPLOAD_QUEUE,
        rabbitmqlib.KS_UPLOAD_KEY,
    )

    client.Declare(
        rabbitmqlib.KS_PLAN_EXCHANGE,
        rabbitmqlib.KS_PLAN_QUEUE,
        rabbitmqlib.KS_PLAN_KEY,
    )

    log.Println("[RabbitMQ] initialization success")
    service.RegisterRabbitMqClient(client)

}

func buildConnURL(cfg *MQBaseConfig) string {
    if cfg.IsAliyun {
        // 阿里云 RabbitMQ 使用 AK/SK 登录
        // 阿里云官方格式: amqp://<AccessKey>:<SecretKey>@<InstanceId>.mq-amqp.<region>.aliyuncs.com:5672/<VHost>
        return fmt.Sprintf(
            "amqp://%s:%s@%s/%s",
            cfg.Ak, cfg.Sk, cfg.EndPoint, cfg.VHost,
        )
    }
    // 自建 RabbitMQ
    return fmt.Sprintf(
        "amqp://%s:%s@%s/%s",
        cfg.UserName, cfg.Password, cfg.URL, cfg.VHost,
    )
}
rabbitMq:
  url: "1.1.1.1:5672"
  userName: "uname"
  password: "pwd"
  isAliyun: true
  SK: "sk"
  AK: "uk"
  endpoint: "aliurl"
  instanceId: ""
  vHost: "vhost"