celery 开源分布式任务队列使用入门
2017-02-23
什么是分布式任务队列
分布式任务队列可以分发任务到多线程,多进程,多机器上执行,例如:
- 例如数据库中有100万条 tag 数据记录,按照时间排序,要统计每小时重复的 tag 数量, 可以按照每天24小时,分解为24个任务,推入分布式任务队列,由24个 work 进程并行计算
- 大量的图片资源需要压缩,可以通过分布式任务队列在集群的多台机器上分发调度任务,提高效率
Celery 特点
- 分布式并发能力,可以运行在单台机器,多台机器机,或者跨数据中心的服务器上
- 高可用性,在连接出错的时候 worker 和 client 会自动重试,broker 支持高可用(RabbitMP)
- 速度快,官方公布,单一 celery 进程,每分钟可以处理百万级别的任务
- 低延迟,使用 RabbitMQ 毫秒级别的延迟
- 可扩展,Celery 中几乎每一个组件都可以水平扩展,定制性强
架构组成
- 客户端,提交任务到消息队列
- 消息队列(消息中间件 broker), 存储/分发任务给工作进程
- 任务执行单元(worker 进程),执行分派的任务,并返回结果
- 结果存储,保存在 RabbitMQ,Redis 等存储中
客户端
有多种语言客户端,也有与框架集成方案
消息队列
- RabbitMQ 消息中间件,不同应用程序之间通过消息来集成
- Redis 缓存,队列,发生故障时可能会丢失数据(低)
序列化方式
- pickle, json, yaml, msgpack
- zlib, bzip2 compression
安装
以下例子基于 Python3
安装 RabbitMQ, Redis
# Centos
yum install -y rabbitmq-server
yum install -y redis-server
# debian, ubuntu
apt-get install rabbitmq-server
apt-get install redis-server
安装 Celery
用标准的 Python 包管理工具 pip 安装
pip install celery
应用例子:简单计算
# tasks.py
from celery import Celery
# 参数一是模块名,broker 是消息中间件,backend 保存结果和消息状态
# app = Celery('tasks', broker='redis://localhost')
app = Celery('tasks', broker='amqp://localhost')
@app.task
def add(x, y):
return x + y
运行 worker 服务
# 默认启动了24个进程
celery -A tasks worker --loglevel=info
任务脚本
# run_task.py
from tasks import add
print(add(100, 28))
# 异步执行
result = add.delay(20, 40)
print(result.ready())
print(result.get())
运行任务
python run_task.py
应用例子:定时任务,1分钟后执行计算任务
from tasks import add
from datetime import datetime, timedelta
import time
minute = datetime.utcnow() + timedelta(minutes=1)
result = add.apply_async((14, 18), eta=minute)
while not result.ready():
print('not calc')
time.sleep(15)
print(result.get())
Celery 监控和管理
基本的命令行管理命令
# 查看 celery 状态
celery status
# **危险**, 从所有的任务队列删除消息
celery -A tasks purge
# 查看激活的任务
celery -A tasks inspect active
# 查看定时任务
celery -A tasks inspect scheduled
# 查看注册的任务
celery -A tasks inspect registered
# 查看任务统计
celery -A tasks inspect stats
web 实时图形监控,Flower
Flower 有丰富的监控展示
- 任务的进度和历史
- 显示任务的详情
- 图形和统计
- 查看 worker
- 关闭 worker
- 扩展worker等
# 安装
pip install flower
# 启动
celery -A proj flower --port=5555
监控图