跳转至

Django-Celery任务队列

Celery是由 Python 语言开发的简单、灵活、可靠的分布式任务队列。
其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。
Celery 侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。

1. 简介说明

在一个应用服务中,对于时效性要求没那么高的业务场景,我们没必要等到所有任务执行完才返回结果。例如用户注册场景中,保存了用户账号密码之后,就可以立即返回,后续的账号激活邮件,可以用一种异步的形式去处理,这种异步操作可以用队列服务来实现。否则,如果需要用户等到邮件发送成功后,可能几秒过去了,该用户也就会流失了。

功能特点

  • 简单: 熟悉Celery的工作流程后配置使用比较简单
  • 快速: 一个单进程的Celery每分钟可处理上百万个任务
  • 灵活: 几乎Celery的各个组件都可以被扩展及自定制
  • 高可用: 当任务执行失败或发生连接中断时会自动尝试重新执行任务

应用场景

  • 重量级:在使用之前先评估是否需要使用Celery呢?如果只是轻量级使用而已,可以使用其他库来代替
  • 坑比较多:使用过Celery的同学都知道,如果随版本更新可能会有意外的收获,我们现在的项目都是固定Celery版本的
  • 文档质量差:阅读过官方文档的都知道,文档组织其实有点不好,而且配置参数居多,脑子痛

应用场景

  • 当用户在网站进行某个操作需要等待很长时间完成时,我们可以将这种操作交给Celery执行,然后直接返回给用户,等到Celery执行完成以后通知用户,可以大大提好网站的并发以及用户的体验感。

  • 比如在运维场景下需要批量在几百台机器执行某些命令或者任务,此时Celery就可以轻松搞定。

  • 像定时导数据报表、发送邮件通知等场景,虽然Linux的计划任务可以实现,但是非常不利于管理,而Celery可以提供管理接口和丰富的API接口。

1
2
3
4
5
6
7
8
# 最简单的Demo示例
from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def hello():
    return 'hello world'

2. 架构原理

熟悉其项目的组织架构以及工作原理

Celery 库是一个非常强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行(有坑)。我们通常,使用它来实现**异步任务**(async task)和**定时任务**(crontab)。

核心角色

角色 描述
Task 就是你要做的事情,例如一个注册流程里面有很多任务,给用户发验证邮件就是一个任务,这种耗时任务可以交给 Celery 去处理。还有一种任务是定时任务,比如每天定时统计网站的注册人数,这个也可以交给 Celery 周期性的处理。
Beat 是一个定时任务调度器,它会根据配置定时将任务发送给 Broker,等待 Worker 来消费。
Broker Celery 中它介于生产者和消费者之间经纪人,这个角色相当于数据结构中的队列。例如一个 Web 系统中,生产者是处理核心业务的 Web 程序,业务中可能会产生一些耗时的任务,比如短信。生产者会将任务发送给 Broker,就是把这个任务暂时放到队列中,等待消费者来处理。消费者是 Worker, 是专门用于执行任务的后台服务,将实时监控队列中是否有新的任务,如果有就拿出来进行处理。Celery 本身不提供队列服务,一般用 Redis 或者 RabbitMQ 来扮演 Broker 的角色。
Worker 就是那个一直在后台默默执行任务的人,也称为任务的消费者。它会实时地监控队列中有没有任务,如果有就立即取出来执行。
Backend 用于保存任务的执行结果,每个任务都有返回值,比如发送邮件的服务会告诉我们有没有发送成功,这个结果就是存在 Backend 中。当然我们并不总是要关心任务的执行结果,所以也可以不需要。

工作原理

  • 任务模块Task包含异步任务和定时任务
  • 任务执行单元Worker实时监视消息队列获取队列中的任务执行
  • Woker执行完任务后将结果保存在Backend

Celery 序列化

  • 在客户端和消费者之间传输数据需要序列化和反序列化
  • 选择 RabbitMQ 作为消息代理
  • RabbitMQ 的 Python 客户端选择 librabbitmq 这个 C 库
  • 选择 Msgpack 做序列化
  • 选择 Redis 做结果存储

json 、 yaml

3. 快速上手

安装 Celery 工具

1
2
3
# 依赖安装
# [一个坑] celery和redis版本有强依赖
pip install -U "celery[redis]"

目录结构

1
2
3
4
5
6
7
8
9
# [配置方式1] 将配置专门写到一个配置文件中,创建实例后读取
# [配置方式2] 不将配置写入文件,而是直接通过统一配置文件获取并初始实例

# 代码目录结构
learn_celery
├── __init__.py  # 定义模块并创建实例
├── config.py    # 配置文件
├── tasks.py     # 创建多个任务
└── user.py      # 用户注册流程

文件内容

1
2
3
4
5
# __init__.py => 定义模块并创建实例

from celery import Celery
app = Celery('learn_celery') # 创建Celery实例且名称为learn_celery
app.config_from_object('learn_celery.config') # 从配置文件config.py中加载配置参数
# config.py => 配置文件
# 配置中的CELERY_为特定前缀
# https://docs.celeryproject.org/en/latest/genindex.html

BROKER_URL = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # Broker配置
CELERY_RESULT_BACKEND = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # BACKEND配置
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = ('learn_celery.tasks',) # 指定导入的任务模块
# tasks.py => 创建多个任务

import time
from learn_celery import app

@app.task
def send_mail(mail):
    print(f'send mail to {mail}')
    time.sleep(5)
    return "success"

@app.task
def send_msg(phone):
    print(f'send message to {phone}')
    time.sleep(5)
    return "success"
# user.py => 用户注册流程

from time import time
from learn_celery.tasks import send_msg

def register():
    start_time = time()
    print("1.验证邮箱地址合法")
    print("2.验证电话号码合法")
    print("3.发送短信进行注册...")
    send_msg.delay('13888888888')
    print("4.短信验证码正确可以通知用户进行注册!")
    spend_time = time() - start_time
    print(f'spend time is {spend_time}s.')

if __name__ == '__main__':
    register()

运行方式

# docker-compose.yml
version: "2.4"

services:
  redis:
    restart: on-failure
    container_name: celery_redis
    image: redis:latest
    ports:
      - "6379:6379"
    command: ["redis-server", "--requirepass", "123456"]
    networks:
      - celery_redis

networks:
  celery_redis:
# 启动redis服务
docker-compose up -d

# 前台启动Worker进程
# worker: 代表启动是worker角色,当然也可以启动其他角色
# -A: 自行指定Celery实例; -l: 表示启动的日志级别; -c: 表示启动的进程数
# -n: 指定Worker; -Q: 指定队列启动消费者进程; -B: Beat和Worker进程一并启动
# http://docs.celeryproject.org/en/latest/userguide/workers.html
celery worker -A learn_celery.tasks -c 2 -l info

# 启动Beat进程完成定时任务
celery beat learn_celery.tasks -l info
# 虽然启动了worker,但是我们还需要通过异步方法
# 如delay或apply_async来将任务添加到worker中
# 添加任务并返回AsyncResult对象获取结果
In [38]: from learn_celery import tasks

In [39]: while True:
    ...:     t = tasks.send_mail.delay('[[emailprotected]](/cdn-cgi/l/email-protection)')
    ...:     time.sleep(1)
    ...:
Out[39]: <AsyncResult: c933fd2a-bfdc-4ca6-809e-3e6369ef61fa>
# AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性
# state: 返回任务状态
# task_id: 返回任务id
# result: 返回任务结果,同get()方法
# ready(): 判断任务是否以及有结果,有结果为True,否则False
# info(): 获取任务信息,默认为结果
# wait(t): 等待t秒后获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错
# successfu(): 判断任务是否成功,成功为True,否则为False

python -m learn_celery.user
1.验证邮箱地址合法
2.验证电话号码合法
3.发送短信进行注册...
4.短信验证码正确可以通知用户进行注册!
spend time is 0.16619586944580078s.

使用 Flower 监控队列任务

# 可视化只要一步而已
celery flower --port=5551 --broker=redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0

4.高级用法

Celery 通过消息机制进行通信,通常使用中间人 Broker 为客户端和 Worker 调节。
启动一个任务,客户端向消息队列发送一条消息,然后中间人 Broker 将消息传递给一个 Worker,最后由 Worker 进行执行分配的任务。
可以有多个 Broker 和 Worker,用来提高 Celery 的高可用性以及横向扩展能力。
Celery 是用 Python 编写的,但协议可以用任何语言实现,如 Node.js 语言的 node-celery 项目。

4.1 中间人

4.2简单使用

创建独立的模块和配置文件 - 安装环境依赖和启动基础服务 - 编写 app 程序 - 启动 Worker 服务并调用任务

1
2
3
4
5
6
7
8
# 使用docker运行MQ服务
docker run -d -p 5462:5462 rabbitmq

# 使用docker运行Redis服务
docker run -d -p 6379:6379 redis

# 使用pip安装依赖
pip install celery
1
2
3
4
5
6
7
8
# 最简单的Demo示例(tasks.py)
from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
1
2
3
4
5
6
7
8
# 启动celery进程
celery worker -A hello --loglevel=info

# 调用任务
# 需要我们创建的实例任务并通过delay()进行调用
python
>>> from hello.tasks import add
>>> add.delay(4, 4)

4.3 后端存储

如果需要保存任务状态,Celery 需要在某处存储任务的状态信息,其中内置了一些后端存储方案。
通过配置文件中的 backend 参数,来指定后端存储方案。在已经配置后端存储的情况下,重新执行任务调用,可以保存对应的消息状态。
- ORM(SQLAlchemy/Django) - Memcached - Redis - RPC(RabbitMQ/AMQP) - 自定义的后端结果存储中间件

1
2
3
4
5
# RPC
app = Celery('hello', backend='rpc://', broker='pyamqp://')

# Redis
app = Celery('hello', backend='redis://localhost', broker='pyamqp://')
# 调用任务
# 需要我们创建的实例任务并通过delay()进行调用
python
>>> from hello.tasks import add
>>> result = add.delay(4, 4)
>>> result.ready()  # 检测是否已经处理完毕
False
>>> result.get(timeout=1)  # 设置超时时间
8
>>> result.get(propagate=False)  # 是否再次引发异常
>>> result.traceback  # 任务出现异常进行回溯

4.3 相关配置

大多数情况下,使用默认的配置就可以满足,当然也可以根据我们的实际需求按需配置。针对大型的项目,建议使用专用配置模块进行配置,将所有的配置项集中化配置。

# [配置项] 通过upate进行配置
# 程序启动时导入的模块列表,便于Worker执行相应的任务
from celery import Celery

app = Celery('hello',
             backend='redis://localhost',
             broker='pyamqp://',
             include=['hello.tasks'])

app.conf.update(
    task_serializer='json',  # 设置任务输出格式
    accept_content=['json'],  # 忽略其他内容
    result_serializer='json',  # 设置结果输出格式
    timezone='Europe/Oslo',  # 时区
    enable_utc=True,  # 启用UTC时钟
    task_routes={  # 设置任务专用队列
        'hello.tasks.add': 'low-priority',
    },
    task_annotations = {  # 任务进行限速
        'hello.tasks.add': {'rate_limit': '10/m'}
    }
)

if __name__ == '__main__':
    app.start()

# $ celery worker -A hello -Q hello
# >>> from hello.tasks import add
# >>> add.apply_async((2, 2), queue='hello')
# [配置模块] 加载配置模块
app.config_from_object('celery_config')

# celery_config.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

task_routes = {
    'hello.tasks.add': 'low-priority',
}

task_annotations = {
    'hello.tasks.add': {'rate_limit': '10/m'}
}

4.4 远程控制

对应运行中的队列任务,我们可以通过对应的命令进行监控。

# inspect
celery -A proj inspect --help

# control
celery -A proj control --help

# event
celery -A proj events --help

# status
celery -A proj status --help

3定时任务

  • Celery的提供的定时任务主要靠schedules来完成
  • 通过beat组件周期性的将任务发送给woker进行执行
# period_task.py
# 新建period_task.py文件并添加任务到配置文件中

from learn_celery import app
from celery.schedules import crontab

@app.task
def add(x, y):
    print(x+y)
    return x+y

@app.task
def say_hello(name):
    return f'hello {name}'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10秒执行add任务
    sender.add_periodic_task(10.0, add.s(1,3), name='1+3=')
    sender.add_periodic_task(
        # 每周一下午四点五十六执行say_hello任务
        crontab(hour=16, minute=56, day_of_week=1),
        say_hello.s('escape'), name='say_hello'
    )
# config.py

BROKER_URL = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # Broker配置
CELERY_RESULT_BACKEND = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # BACKEND配置
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = (              # 指定导入的任务模块
    'learn_celery.tasks',       # 普通任务
    'learn_celery.period_task', # 定时任务
)
1
2
3
4
5
6
7
8
# 启动worker和beat之后观察worker日志
# 会发现定时任务会自动往队列中添加,之后被对应worker消费掉

# 启动worker
celery worker -A learn_celery.tasks -l debug

# 启动beat
celery beat -A learn_celery.period_task -l debug
# config.py
# 还可以通过配置文件方式指定定时和计划任务

from learn_celery import app
from celery.schedules import crontab

BROKER_URL = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # Broker配置
CELERY_RESULT_BACKEND = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # BACKEND配置
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = (              # 指定导入的任务模块
    'learn_celery.tasks',       # 普通任务
    'learn_celery.period_task', # 定时任务
)

app.conf.beat_schedule = {
    'period_add_task': { # 添加计划任务
        'task': 'learn_celery.period_task.add', # 任务路径
        'schedule': crontab(hour=18, minute=16, day_of_week=1), # 执行周期
        'args': (3, 4), # 传入参数
    },
    'say_hello': { # 添加计划任务
        'task': 'learn_celery.period_task.say_hello', # 任务路径
        'schedule': 10.0, # 执行周期
        'args': ('escape',) # 传入参数
    },
}
# period_task.py
# 此时的period_task.py只需要注册到woker中就行了

from project import app

@app.task
def add(x,y):
    print(x+y)
    return x+y

@app.task
def say_hello(name):
    return f'hello {name}'

4. 进阶 - 任务绑定

  • Celery可通过任务绑定到实例获取到任务的上下文
  • 这样我们可以在任务运行时候获取到任务的状态,记录相关日志等
# period_task.py
# 通过bind参数将任务绑定,self指任务的上下文
# 通过self获取任务状态,同时在任务出错时进行任务重试

from learn_celery import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(bind=True) # 绑定任务
def add(self, x, y):
    logger.info(self.request.__dict__) # 打印日志
    try:
        a=[]
        a[10] == 1
    except Exception as e:
        # 出错每5秒尝试一次,总共尝试3次
        raise self.retry(exc=e, countdown=5, max_retries=3)
    return x+y

5. 进阶 - 内置钩子函数

  • Celery在执行任务时候,提供了钩子方法用于在任务执行完成时候进行对应的操作
  • 在Task源码中提供了很多状态钩子函数如:
  • on_success(成功后执行)
  • on_failure(失败时候执行)
  • on_retry(任务重试时候执行)
  • after_return(任务返回时候执行)
  • 在进程中使用是我们只需要重写这些方法,完成相应的操作即可
# period_task.py
# 在以下示例中,我们继续修改period_task.py文件
# 分别定义三个任务来演示任务失败、重试、任务成功后执行的操作

from learn_celery import app
from celery.utils.log import get_task_logger
from celery import Task

logger = get_task_logger(__name__)

class demotask(Task):
    # 任务成功执行
    def on_success(self, retval, task_id, *args, **kwargs):
        logger.info(f'task id:{task_id}, arg:{args}, successful!')

    # 任务失败执行
    def on_failure(self, exc, task_id, *args, **kwargs, einfo):
        logger.info(f'task id:{task_id}, arg:{args}, failed! erros:{exc}')

    # 任务重试执行
    def on_retry(self, exc, task_id, *args, **kwargs, einfo):
        logger.info(f'task id:{task_id}, arg:{args}, retry! einfo:{exc}')

@app.task(base=demotask, bind=True)
def add(self, x, y):
    try:
        a=[]
        a[10] == 1
    except Exception as e:
        # 出错每5秒尝试一次,总共尝试1次
        raise self.retry(exc=e, countdown=5, max_retries=1)
    return x+y

@app.task(base=demotask)
def say_hello(name):
    a=[]
    a[10] == 1
    return f'hello {name}'

@app.task(base=demotask)
def sum(a, b):
    return f'a+b={a+b}'
# config.py

from learn_celery import app
from celery.schedules import crontab

BROKER_URL = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # Broker配置
CELERY_RESULT_BACKEND = 'redis://:[[emailprotected]](/cdn-cgi/l/email-protection):6379/0' # BACKEND配置
CELERY_RESULT_SERIALIZER = 'json' # 结果序列化方案
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间
CELERY_TIMEZONE='Asia/Shanghai' # 时区配置
CELERY_IMPORTS = (              # 指定导入的任务模块
    'learn_celery.tasks',       # 普通任务
    'learn_celery.period_task', # 定时任务
)

app.conf.beat_schedule = {
    'add': { # 每10秒执行
        'task': 'learn_celery.period_task.add',
        'schedule': 10.0,
        'args': (10, 12),
    },
    'say_hello': { # 每10秒执行
        'task': 'learn_celery.period_task.say_hello',
        'schedule': 10.0,
        'args': ('escape',),
    },
    'sum': { # 每10秒执行
        'task': 'learn_celery.period_task.sum',
        'schedule': 10.0,
        'args': (1, 3),
    },
}

6. 进阶 - 任务编排

在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成

Celery同样也能实现这样的任务,完成这类型的任务通过以下模块完成 - group: 并行调度任务 - chain: 链式任务调度 - chord: 类似group但分header和body2个部分;header可以是一个group任务,执行完成后调用body的任务 - map: 映射调度,通过输入多个入参来多次调度同一个任务 - starmap: 类似map,入参类似*args - chunks: 将任务按照一定数量进行分组

# tasks.py

from learn_celery import app

@app.task
def add(x, y):
    return x+y

@app.task
def mul(x, y):
    return x*y

@app.task
def sum(data_list):
    res=0
    for i in data_list:
        res+=i
    return res
# consumer.py
# group: 组任务
# 组内每个任务并行执行

from celery import group
from learn_celery.tasks import add

# 任务 [1+2, 1+2]
res = group(add.s(1,2), add.s(1,2))()
while True:
    if res.ready():
        print(f'res:{res.get()}')
        break
# consumer.py
# chain: 链式任务
# 链式任务中,默认上一个任务的返回结果作为参数传递给子任务

from celery import group
from learn_celery.tasks import add, mul, sum

# 任务((1+2)+3)*3
res = chain(add.s(1,2), add.s(3), mul.s(3))()
while True:
    if res.ready():
        print(f'res:{res.get()}')
        break
# consumer.py
# chord:任务分割
# 分为header和body两部分
# hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body

from celery import group
from learn_celery.tasks import add, mul, sum

# 任务(1+2)+(3*4)
res = chord(header=[add.s(1,2), mul.s(3,4)], body=sum.s())()
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break
# consumer.py
# chunks:任务分组
# 按照任务的个数分组

from learn_celery.tasks import add, mul, sum

# 4代表每组的任务的个数
res = add.chunks(zip(range(5), range(5)), 4)()
while True:
    if res.ready():
        print('res:{}'.format(res.get()))
        break

7. 异步调用原理

主要解释 delay 和 apply_async 的使用方法和区别

  • 对于delay和apply_async都可以用来进行任务的调度
  • 本质上是delay对apply_async进行了再一次封装或者可以说是快捷方式
  • 两者都返回AsyncResult对象,以下是两个方法源码
# --------------------------------------------
# delay源码
# --------------------------------------------
def delay(self, *args, **kwargs):
        """Star argument version of :meth:`apply_async`.

        Does not support the extra options enabled by :meth:`apply_async`.

        Arguments:
            *args (Any): Positional arguments passed on to the task.
            **kwargs (Any): Keyword arguments passed on to the task.
        Returns:
            celery.result.AsyncResult: Future promise.
        """
        return self.apply_async(args, kwargs)
# --------------------------------------------
# apply_async源码
# --------------------------------------------
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
                    link=None, link_error=None, shadow=None, **options):
        if self.typing:
            try:
                check_arguments = self.__header__
            except AttributeError:  # pragma: no cover
                pass
            else:
                check_arguments(*(args or ()), **(kwargs or {}))

        app = self._get_app()
        if app.conf.task_always_eager:
            with denied_join_result():
                return self.apply(args, kwargs, task_id=task_id or uuid(),
                                  link=link, link_error=link_error, **options)

        if self.__v2_compat__:
            shadow = shadow or self.shadow_name(self(), args, kwargs, options)
        else:
            shadow = shadow or self.shadow_name(args, kwargs, options)

        preopts = self._get_exec_options()
        options = dict(preopts, **options) if options else preopts

        options.setdefault('ignore_result', self.ignore_result)

        return app.send_task(
            self.name, args, kwargs, task_id=task_id, producer=producer,
            link=link, link_error=link_error, result_cls=self.AsyncResult,
            shadow=shadow, task_type=self,
            **options
        )

8. 使用注意事项

1
2
3
4
# -------------------------------------------------------------------
# 1.在celery中worker启动时,如果是root用户则需要设置环境变量
# -------------------------------------------------------------------
export C_FORCE_ROOT='true'
1
2
3
4
5
6
7
# -------------------------------------------------------------------
# 2.使用RabbitMQ或Redis作为Broker的话,生产环境永远不要使用关系型数据库
# 3.Celery4.x开始不再支持Windows平台,如果需要在Windows开发请使用3.x的版本
pip install celer[redis]==4.3.0
pip install celery[librabbitmq]
pip install celery[librabbitmq,redis,auth,msgpack]
# -------------------------------------------------------------------
1
2
3
4
5
# 4.良好的配置操作
# 禁用预取任务是为了防止Broker分配任务不均导致多次重复执行
# 设置执行多少次任务之后进行销毁是为了防止卡死的出现和发生
worker_prefetch_multiplier=0   # 禁用预取任务
worker_max_tasks_per_child=50  # 每个worker执行50次任务即销毁
# -------------------------------------------------------------------
# 5.不要使用复杂对象作为任务函数的参数
# -------------------------------------------------------------------

# Good
@app.task
def my_task(user_id):
    user = User.objects.get(id=user_id)
    print(user.name)
    ...

# Bad
@app.task
def my_task(user):
    print(user.name)
    ...