惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

F
Full Disclosure
Recorded Future
Recorded Future
T
Tenable Blog
S
Securelist
C
CERT Recently Published Vulnerability Notes
T
Threatpost
S
Schneier on Security
A
Arctic Wolf
The Hacker News
The Hacker News
C
CXSECURITY Database RSS Feed - CXSecurity.com
Know Your Adversary
Know Your Adversary
P
Privacy International News Feed
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
The Register - Security
The Register - Security
Cisco Talos Blog
Cisco Talos Blog
AWS News Blog
AWS News Blog
K
Kaspersky official blog
T
True Tiger Recordings
T
Threat Research - Cisco Blogs
V
Vulnerabilities – Threatpost
P
Palo Alto Networks Blog
T
The Exploit Database - CXSecurity.com
小众软件
小众软件
B
Blog
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
Microsoft Azure Blog
Microsoft Azure Blog
Cyberwarzone
Cyberwarzone
C
Cybersecurity and Infrastructure Security Agency CISA
T
Tor Project blog
Spread Privacy
Spread Privacy
Malwarebytes
Malwarebytes
P
Proofpoint News Feed
F
Fox-IT International blog
F
Fortinet All Blogs
P
Privacy & Cybersecurity Law Blog
G
GRAHAM CLULEY
量子位
Latest news
Latest news
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
博客园 - 叶小钗
Project Zero
Project Zero
T
Tailwind CSS Blog
N
Netflix TechBlog - Medium
Martin Fowler
Martin Fowler
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
I
Intezer
博客园_首页
腾讯CDC
H
Hackread – Cybersecurity News, Data Breaches, AI and More
D
Darknet – Hacking Tools, Hacker News & Cyber Security

郑文峰的博客

使用dify对接飞书多维表格 使用n8n对接飞书多维表格 服务启动时出现 OOM Bug 通缉令 一次服务升级时pg表DDL执行超时失败 Go语言高效IO缓冲技术详解 Go语言延迟初始化(Lazy Initialization)最佳实践 Go语言字符串拼接性能对比与优化指南 Go语言结构体内存对齐完全指南 Go语言空结构体:零内存消耗的高效编程 Go语言堆栈分配与逃逸分析深度解析 Go语言原子操作完全指南 Go语言内存预分配完全指南 Go语言不可变数据共享:无锁并发编程实践 Go语言零拷贝技术完全指南 Go语言遍历性能深度解析:从原理到优化实践 Go语言Interface Boxing原理与性能优化指南 Go协程池深度解析:原理、实现与最佳实践 使用etcd分布式锁导致的协程泄露与死锁问题 基于pre-commit的Python代码规范落地实践 初识 MCP Server pulsar阻塞导致logstash无法接入日志 django-prometheus使用及源码分析 kube-proxy源码分析 kubernetes service如何通过iptables转发 tcp缓存引起的日志丢失 理解calico容器网络通信方案原理 理解flannel的三种容器网络方案原理 理解Linux IPIP隧道 理解VXLAN网络 理解Linux TunTap设备 快速了解iptables kafka中listener和advertised.listeners的作用 django rest_framework 分页 django后端服务、logstash和flink接入VictoriaMetrics指标监控 python中import原理 docker容器单机网络 手动实现docker容器bridge网络模型 mysql之MVCC原理 mysql之日志 使用java开发logstash的filter插件 使用python实现单例模式的三种方式 redis之缓存 redis之分片集群 redis之哨兵机制 redis之主从库同步 redis之持久化 redis之五种基本数据类型 go中如何处理error pod中将代码与运行环境分离 友链 ddt源码分析 python装饰器的使用方法 读书笔记:如何阅读一本书 使用ddt实现unittest的参数化测试 分布式锁 使用kubeadm安装k8s 优化gin表单的错误提示信息 gin中validator模块的源码分析 go简单使用grpc python简单使用grpc k8s之PV、PVC和StorageClass k8s之StatefulSet k8s之DaemonSet k8s之Job和CronJob k8s之ConfigMap和Secret k8s之Service k8s之Pod k8s之Deployment 容器的本质 docker容器 python迭代器与生成器 python元编程 python垃圾回收机制 python上下文管理器 django rest_framework使用jwt django rest_framework异常处理 django rest_framework 自定义文档 django压缩文件下载 django rest_framework使用pytest单元测试 django restframework choice 自定义输出数据 django Filtering 使用 django viewset 和 Router 配合使用时报的错 django model的序列化 django中使用AbStractUser django.core.exceptions.ImproperlyConfigured Application labels aren't unique, duplicates users django 中 media配置 django 外键引用自身和on_delete参数 django 警告 while time zone support is active Flask使用flask_socketio实现websocket flask结合mongo tornado 文件上传 tornado 使用jwt完成用户异步认证 tornado 用户密码 bcrypt加密 tornado 结合wtforms使用表单操作 tornado finish和write区别 tornado 使用peewee-async 完成异步orm数据库操作 pyspark streaming简介 和 消费 kafka示例 使用hue创建ozzie的pyspark action workflow count的性能优化
django-apschedule定时任务异常停止
2023-10-30 · via 郑文峰的博客

# 背景

在django项目中使用django-apschedule来实现定时任务,使用的是BackgroundScheduler调度类,该调度的实现是通过后台线程的方式执行定时任务。其中任务都是持久化到数据库中的。

在项目的运行过程中,因为数据库的异常,导致定时任务线程异常终止,即使数据库后续恢复正常,但也不再继续执行。我多次尝试复现未果,在开启定时任务期间,手动将数据库连接断开,定时任务执行失败,然后再将数据库建立连接,定时任务竟然重新恢复了,这让我一时摸不着头脑。

具体的错误日志如下,通过分析,是update_job连接数据库异常,没有任何捕获机制,然后层层网上抛,最终导致线程停止,可以很肯定的是,绝对是因为数据库连接失败导致的定时任务失败,那为什么无法复现呢?

Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/python3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/blocking.py", line 32, in _main_loop
    wait_seconds = self._process_jobs()
  File "/usr/local/python3/lib/python3.7/site-packages/apscheduler/schedulers/base.py", line 1009, in _process_jobs
    jobstore.update_job(job)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/util.py", line 105, in func_wrapper
    result = func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django_apscheduler/jobstores.py", line 249, in update_job
    with transaction.atomic():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/transaction.py", line 189, in __enter__
    if not connection.get_autocommit():
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 389, in get_autocommit
    self.ensure_connection()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
     return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 219, in ensure_connection
    self.connect()
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/base/base.py", line 200, in connect
    self.connection = self.get_new_connection(conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/django/utils/asyncio.py", line 33, in inner
    return func(*args, **kwargs)
  File "/usr/local/python3/lib/python3.7/site-packages/django/db/backends/postgresql/base.py", line 187, in get_new_connection
    connection = Database.connect(**conn_params)
  File "/usr/local/python3/lib/python3.7/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
django.db.utils.OperationalError: connection to server at "xxxx.postgresql.svc.cluster.local" (xx.xx.xx.xx), port xxxx failed: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 源码分析原因

可以先看下BackgroundScheduler的实现方式,在start方法中创建了个子线程。

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def start(self, *args, **kwargs):
        if self._event is None or self._event.is_set():
            self._event = Event()

        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        self._thread.daemon = self._daemon
        self._thread.start()

    def shutdown(self, *args, **kwargs):
        super(BackgroundScheduler, self).shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

其中_main_loopBlockingScheduler中实现,是一个死循环,执行_process_jobs方法

class BlockingScheduler(BaseScheduler):
    
    ...

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()
    
    ...

1
2
3
4
5
6
7
8
9
10
11
12

再看_process_jobs中的内容,在BaseScheduler实现的,主要流程如下,先找到所有要执行的job,然后进行遍历运行并更新Job的状态。之前的错误日志,也就是这里的update_job抛出异常,而这里并没有捕获异常,最终层层往上抛,update_job -> _process_jobs -> _main_loop,最终线程异常终止。

def _process_jobs(self):
    for jobstore_alias, jobstore in six.iteritems(self._jobstores):
        try:
            due_jobs = jobstore.get_due_jobs(now)
        except Exception as e:
            ...
            continue

        ...
                
        for job in due_jobs:
      
            ...
            
            try:
                executor.submit_job(job, run_times)
            except BaseException:
                ...

            ...
            jobstore.update_job(job)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

那为什么复现不了呢?这个是因为,关闭数据库连接时,程序不一定可以正好运行在update_job,可以看到前面的get_due_jobs进行了异常捕获,如果这里抛出数据库连接异常是可以捕获到的,然后跳过后面的操作,等待下一次定时任务的执行,如果还是失败,则再次等待,所以这里的异常不会抛到最上层导致线程停止。

但如果某个时机,上面连接数据库都成功了,到update_job这里异常抛出,则会导致整个线程停止,定时任务不再执行。

那如何解决该问题呢?

# 搭建demo

首先我们搭建一个demo出来,模拟复现该问题。

  1. 创建django项目

django-admin startproject apschedule_demo

python manage.py startapp demo

python manage.py makemigrations

python manage.py migrate

1
2
3
4
5
6
7
8

  1. 在settings.py中配置到好数据库信息
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "NAME": "apschedule_demo",
        "HOST": "xxxx",
        "PORT": 5432,
        "USER": "xxx",
        "PASSWORD": "xxx"
    }
}

1
2
3
4
5
6
7
8
9
10
11

  1. 根据django-apschedule官方 (opens new window)提供的文档搭建demo

在settings.py中添加该APP

INSTALLED_APPS = (
    # ...
    "django_apscheduler",
)

1
2
3
4

创建目录demo/management/commands,并在其下面创建runapscheduler.py文件,代码内容如下:

import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore

logger = logging.getLogger(__name__)


def my_job():
  # Your job processing logic here...
  print("job..")


class Command(BaseCommand):
  help = "Runs APScheduler."

  def handle(self, *args, **options):
    scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
    scheduler.add_jobstore(DjangoJobStore(), "default")

    scheduler.add_job(
      my_job,
      trigger=CronTrigger(second="*/3"),  # Every 3 seconds
      id="my_job",  # The `id` assigned to each job MUST be unique
      max_instances=1,
      replace_existing=True,
    )
    logger.info("Added job 'my_job'.")

    try:
      logger.info("Starting scheduler...")
      scheduler.start()

    # 因为上面是非阻塞开启定时任务,所以这里需要阻塞,不让主线程结束。
    while True:
            time.sleep(10)
    except KeyboardInterrupt:
      logger.info("Stopping scheduler...")
      scheduler.shutdown()
      logger.info("Scheduler shut down successfully!")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

可以通过python manage.py runapscheduler执行上面的命令运行定时任务,该脚本创建了一个每3秒执行一次的任务。

  1. 复现

我们将断点打在jobstore.update_job(job)上,然后使用debug模式进行调试,当程序运行到断点上时,将数据库关闭,然后程序继续运行,则会报错,并抛出异常,线程停止了运行。至此,我们复现了该问题。

# 线程重启

我一开始想,我可以判断该线程是否异常,如果异常则将线程重启就好了

    while True:
        if not scheduler._thread.is_alive():
            scheduler._thread.start()

        time.sleep(10)

1
2
3
4
5

但事与愿违,抛出了异常,异常信息如下:

RuntimeError: threads can only be started once

1

通过查看官方文档可以知道,线程的start方法只能调用一次。

# listener

apschedule中提供了监听器机制,也就是在定时任务的成功、失败等状态都可以通过提前注册的listener方法来进行回调。但通过分析源码,其并不能捕获到定时任务线程的异常。

下面是简化了代码的listeners的原理流程:

  1. 外部通过add_listener方法注册回调方法
  2. 在定时任务线程主流程_process_jobs中发生的各个事件添加到events中
  3. 遍历events事件,然后通过与注册的回调方法mask进行匹配,匹配上则调用回调方法
class BaseScheduler:
    def __init__(...):
        self._listeners = []

    def add_listener(self, callback, mask=EVENT_ALL):
        self._listeners.append((callback, mask))

    def _process_jobs(self):

        events = []
        
        ...

        events.append(event)
 
        ...


        for event in events:
            self._dispatch_event(event)


    def _dispatch_event(self, event):
        for cb, mask in listeners:
            if event.code & mask:
                try:
                    cb(event)
                except BaseException:
                    self._logger.exception('Error notifying listener')

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

如果线程本身挂了,回调方法是不可执行的。

# 捕获线程中函数的异常

如果update_job抛出异常导致线程停止,那我捕获它的异常,然后再continue,等待下次定时任务运行再重试不就好了,但是这就需要改动源码,能不能改源码就尽量不改。所以这边我采用了继承BackgroundScheduler类,然后再重写_process_jobs方法来解决。

在重写的_process_jobs方法中,对父类的_process_jobs()进行异常的捕获,然后再不断的进行重试,这样即使update_job抛出异常了,也可以不断的进行尝试恢复,直至成功。

class DemoBackgroundScheduler(BackgroundScheduler):
    def _process_jobs(self):
        while True:
            try:
                return super()._process_jobs()
            except BaseException:
                time.sleep(5)

class Command(BaseCommand):
    help = "Runs APScheduler."

    def handle(self, *args, **options):
        scheduler = DemoBackgroundScheduler(timezone=settings.TIME_ZONE)
        ...

1
2
3
4
5
6
7
8
9
10
11
12
13
14

然后再次尝试复现该问题,可以发现在断开数据库后,它能够一直进行重试,线程没有停止,当数据库恢复运行后,job执行成功,不再抛出异常。

# 相关链接