celery distributed message queue

Quitters never win and winners never quit
The retreat will never win, and the winner will never retreat.

I used the rabbitmq+pika combination on the distributed message queue. However, due to the lack of understanding of rabbitmq and pika, I encountered a lot of pits during use. Until recently I decided to re-examine the distributed message queue, of course, this time abandoned pika, and chose celery.
Before I think about it, I have some questions about pika and celery. What is the difference between the two? What are the similarities? After a few days of research, it is finally clear, so make a record of celery+rabbitmq.

Installing celery

1
pip install celery

Note: celery only supports python2.7 and above, it is recommended to install in virtual environment, how to construct virtual environment can refer to: [python virtual environment] (https://thief.one/2017/08/24/2/)

How does Celery work?

I am here to simulate several roles to explain how celery+rabbitmq works. The brain hole comes from the network.

Assume that D company is going to open a semi-annual work meeting. The meeting should specify the work plan for the second half of the year. The participants include the boss (issuing the task), the department head (celery assigning task), the department employee (worker), and the boss secretary. (Communication coordinator, rabbitmq).

What is the job content?

Then the first thing that needs to be determined at this meeting is the specific work content of the second half of the year. It is called “task content” here. For example, the boss said that we will develop a big data platform in the second half of the year. The department heads praised and agreed, so we happily set our specific task. Of course, developing a platform is the overall task of this project. Which can be subdivided into many small tasks, such as how to write big data algorithms? How to design the interface.

Where are the workers?

After determining the specific work tasks, the boss handed the project to the department head (celery), and the department head determined at this time who is going to complete the task. It can specify a person or multiple people.

Where are the publishing workers?

There is no doubt that the person who posted the task is the boss (issuing the task), who specifies when the department manager (celery) will complete the tasks and ask for feedback. But one thing to note is that the boss only arranges the tasks, but does not participate in the specific task assignment, then the function assigned to this task is given to who, right, the department head, that is, celery.

How do bosses and employees communicate with each other?

At the beginning of the project, the boss passes the task to the department head by telephone. The department head assigns the task to the employee through the department meeting, and then returns the task result to the boss after a while. However, as more and more tasks are completed, department heads have discovered a problem. There are too many tasks. Each task also needs to feed back the results. It can’t be remembered, and it is easy to mess up, resulting in reduced efficiency.

Demo code

celery_con.py

1
2
3
4
from celery import Celery
import time
app = Celery(backend='amqp', broker='amqp://guest:guest@127.0.0.1:5672')

Description: The role of celery_con.py is to connect to rabbitmq. Note that this is the rabbitmq connected by celery. Mapping to the scene is the channel through which the secretary and the supervisor, the secretary and the boss pass information.

Task.py (task content)

1
2
3
4
5
6
7
8
9
10
11
12
from celery_con import app
@app.task
def test(x, y):
time.sleep(5)
return x + y
@app.task
def scan(x,y):
time.sleep(1)
return x-y

Description: The function of task.py is to customize the specific task, that is, “task content”. When mapping to the scene, it is “developing a big data platform”. How to write the algorithm? How to design the interface and so on.

Celery (department supervisor)

1
celery -A task worker -c 2

Description: This command is to open the work, assign tasks; task is the name of the task.py script, indicating that work is task task; -c 2 means to open 2 work at the same time. Mapping to the scene means that the department head gets the note to the secretary in real time and assigns it to the employee.

Run.py (boss)

1
2
3
4
from task import test,scan
res=test.delay(2,2)
print res.get()

Description: The function of run.py is to send a message to the rabbitmq queue, mapping to the scene, that is, the boss writes the task on the note to the secretary.

run:

1
python run.py

The secretary here refers to rabbitmq.

The difference between celery and pika

Simply put, pika is actually a python client module used to connect to the rabbitmq service, and rabbitmq itself has only message storage, and there is no task allocation scheduling. Of course, the process of connecting rabbitmq with pika can also be assigned. This requires using the pika module to write a scheduling code, which is equivalent to writing a celery module.
Celery is used to assign tasks, mainly to do asynchronous task queues, but celery does not have the storage function, so a medium is needed to store messages, so it is often used together with rabbitmq.

celery advanced usage

1
2
3
4
5
from task import scan
r=scan.s(2,2)
res=r.delay()
print res.get()

Concurrent delivery tasks

For concurrent tasks, you can also use a for loop. Concurrency here means that not all tasks are executed together, but all tasks are sent to the queue, and the number of concurrent executions depends on the number of jobs.

1
2
3
4
from celery import group
from task import scan
g=group( scan.s(i,i) for i in range(10)).delay()
print g.get()

Specify the queue to be delivered

Sometimes we encounter multiple tasks, and each task has different execution objects, so we need to create different queues to store the tasks. In this case, we need to specify the name of the queue when creating tasks and consuming tasks.

Configuring celery

celery_con.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from celery import Celery,platforms
RABBITMQ_IP="127.0.0.1"
RABBITMQ_PORT="5672"
RABBITMQ_USER=""
RABBITMQ_PASS=""
app = Celery(
backend='amqp',
broker='amqp://{}:{}@{}:{}'.format(
RABBITMQ_USER,
RABBITMQ_PASS,
RABBITMQ_IP,
RABBITMQ_PORT,
),
CELERY_ROUTES = {
'worker.test 1': {'queue': 'test 1'},
'worker.test2': {'queue': 'test2'},
'worker.test3': {'queue': 'test3'},
},
)
# Allow celery to start with root privileges
platforms.C_FORCE_ROOT = True
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_IGNORE_RESULT = True,
CELERYD_PREFETCH_MULTIPLIER = 10,
CELERYD_MAX_TASKS_PER_CHILD = 200,
)

Specify task content

task.py

1
2
3
4
5
6
7
8
9
from celery_con import app
@app.task
def test(x, y):
time.sleep(5)
return x + y
@app.task
def scan(x,y):
time.sleep(1)
return x-y

Sending a task

push_task.py

1
2
3
4
from celery import group
from task import scan
g=group( scan.s(i,i) for i in range(10)).apply_async(queue='test1')
print g.get()

Note: When the task is delivered, the task will be stored in the test1 queue of rabbitmq.

Start the work processing task

celery_start_work.sh

1
celery -A task worker --queue=test1

Description: The worker will get the data from the test1 queue of rabbitmq.

celery+rabbitmq optimization

Ignore results

I looked at the rabbitmqweb page and found that every time a celery executes a task, it generates a queue. This queue stores the state of the task execution, and this queue is very memory-intensive. Only when the client performs the acquired operation, the queue will disappear.

1
@app.task(ignore_result=True) #Ignore the result so that no queue is generated

celery timing task (planning task)

Under normal circumstances, we will use the crontab that comes with the Linux system to do the scheduled tasks. However, in celery, you can create scheduled tasks with its own scheduled task function.

Create celery_con.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from celery import Celery
From celery.schedules import crontab # plan task module
RABBITMQ_IP=""
RABBITMQ_PORT=""
RABBITMQ_USER=""
RABBITMQ_PASS=""
app = Celery(
backend='amqp',
broker='amqp://{}:{}@{}:{}'.format(
RABBITMQ_USER,
RABBITMQ_PASS,
RABBITMQ_IP,
RABBITMQ_PORT,
),
)
app.conf.update(
beat_schedule={
"crontab_1": {
"task": "celery_work.run", # executed task, which is the run function of the celery_work file
"schedule": crontab(minute='*/1'), # execute once per minute
"args": ("celery_crontab_test",) # Execute the parameters passed in the task
},
# ......
}
)

Note: The key point is to set beat_schedule (plan task) in app.conf.update, task indicates the name of the task to be executed, schedule represents the execution cycle of the scheduled task, and args represents the parameters that need to be passed when the task is executed. Schedule specific configuration can refer to:
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab

Create celery_work.py

1
2
3
4
5
from celery_con import app
@app.task
def run(msg):
print msg

Description: Import the celery configuration, use the decorator to set the run function to the celery task.

Executing celery scheduled tasks

1
celery -A celery_work worker -B

Explanation: -A represents the name of the task executed (same as the name of the work file), and -B indicates the execution of the periodic task. There can only be one process and multiple starts.

The result of the execution is that every minute, the output: celery_crontab_test

celery+rabbitmq Priority Task

Rabbitmq supports queue priority in version 3.5. Be sure to upgrade the rabbitmq version to 3.5 or later, otherwise you will not be able to use the priority. Need to explain, there are two priorities here, the first one is the same queue, the different messages in the queue can be set to priority; the second is to set the priority between different queues.

Different queue priorities for the same queue

Corresponding requirements: When the work performs regular tasks, the work needs to perform some emergency tasks (bursts), so some sudden tasks are pushed to the same queue, but they are ranked first in the queue (high priority), that is, first Let the work perform emergency tasks.

First create a priority queue in the web interface


You can see that the hello queue has a Pri flag, indicating that it is a priority queue.

Create a celery configuration file: (config.py)

File write:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from celery import Celery
from kombu import Exchange, Queue
RABBITMQ_IP=""
RABBITMQ_PORT=""
RABBITMQ_USER=""
RABBITMQ_PASS=""
app = Celery(
backend='amqp',
broker='amqp://{}:{}@{}:{}'.format(
RABBITMQ_USER,
RABBITMQ_PASS,
RABBITMQ_IP,
RABBITMQ_PORT,
),
)
# Related configuration written here
app.conf.update(
CELERY_ACKS_LATE = True,
CELERYD_PREFETCH_MULTIPLIER = 1,
CELERYD_MAX_TASKS_PER_CHILD = 500,
CELERY_ENABLE_REMOTE_CONTROL = False,
CELERYD_TASK_TIME_LIMIT = 60,
CELERY_DEFAULT_QUEUE = 'hello',
CELERY_QUEUES = (
Queue('hello', Exchange('hello'), routing_key='hello',queue_arguments={'x-max-priority': 10}), # queue name is hello
),
)

Create a simple test task: (task.py)

File write:

1
2
3
4
5
6
7
import time
from config import app
@app.task(ignore_result=True)
def run(task):
print task
time.sleep(1)

Create a push task py:(push_task.py)

File write:

1
2
3
4
5
6
7
8
9
10
from celery import group
from task import run
Group( run.s("111111111",) for i in range(10)).apply_async(queue='hello',priority=1) # priority=1 Used to set the message priority
Group( run.s("999999999",) for i in range(10)).apply_async(queue='hello',priority=9) # priority=1 Used to set the message priority
# Alternatively, you can also push the task in the following way:
# for i in range(10):
# run.apply_async(args=['111'],queue="hello",priority=1)
# run.apply_async(args=['999'],queue="hello",priority=9)

Creating a worker with celery
1
celery -A task worker -Q hello


Note: As can be seen from the results, the worker prioritizes the message with priority 9.

Priority between different queues

Corresponding requirements: The worker will get the tasks in multiple queues and execute them, but the execution priority of some queues is high, and the execution priority of some queues is low.

  • The implementation plan has not been found yet, and the online program test is not successful! *

The above content is the celery usage and some principles that the individual understands. If there is any delay, please correct me, thank you!

本文标题:celery distributed message queue

文章作者:nmask

发布时间:2017年08月25日 - 17:08

最后更新:2019年07月11日 - 18:07

原始链接:https://thief.one/2017/08/25/1/en/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

nmask wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!

热门文章推荐: