Python 3.5 Coroutine Learning Study

I have previously studied the knowledge related to python coroutines, but I have not conducted in-depth research. The usual work is still based on python2, but recent projects need to use python3 coroutage related content, so I have time to learn a lot of python3’s coroutine syntax.
This article mainly introduces the async/await coroutine syntax of Python 3.5, because this syntax looks awkward and not easy to understand. If you are not very familiar with the python coroutine foundation, it is recommended to read this article: [Python coroutine] (https://thief.one/2017/02/20/Python%E5%8D%8F%E7%A8%8B /).

Coroutine function (asynchronous function)

The functions we usually use most are synchronous functions, that is, different function executions are executed in order. So what is an asynchronous function? How to create an asynchronous function? How to switch back and forth between asynchronous functions? No hurry, please look down.

Creating a coroutine function

Let’s look at the ordinary function:

1
2
3
4
5
6
7
8
9
10
11
12
def test1():
print("1")
print("2")
def test2():
print("3")
print("4")
a = test1 ()
b = test2()
print(a,type(a))
print(b,type(b))

Run the above code to get the result:

1
2
3
4
5
6
1
2
3
4
None <class 'NoneType'>
None <class 'NoneType'>

Description: The program executes the test1 and test2 functions in sequence, and automatically enters the function body when the function is called, and executes the contents of the function.

Then use the async keyword to turn the ordinary function into a coroutine function, that is, an asynchronous function:

1
2
3
4
5
6
7
8
9
10
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
print(test 1())
print(test2())

Run the above code to get the result:

1
2
3
4
5
6
<coroutine object test1 at 0x109f4c620>
asyncio_python3_test.py:16: RuntimeWarning: coroutine 'test1' was never awaited
print(test 1())
<coroutine object test2 at 0x109f4c620>
asyncio_python3_test.py:17: RuntimeWarning: coroutine 'test2' was never awaited
print(test2())

Description: Ignore the alarm in the result. You can see that when the function test1 and test2 are called, the function body is not entered and the function content is executed. Instead, a coroutine (coroutine object) is returned.

In addition to functions, class methods can also be turned into coroutine methods using the async keyword:

1
2
3
class test:
async def run(self):
print("1")

Executing a coroutine function

Earlier we successfully created a coroutine function, and returned a coroutine object when calling the function, then how to enter the function body and execute the function content? Similar to the generator, you can use the send method to execute the function, modify the previous code:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1 ()
b = test2()
a.send(None)
b.send(None)

Running the above code results in the following:

1
2
3
4
5
6
7
1
2
Traceback (most recent call last):
File "asyncio_python3_test.py", line 19, in <module>
a.send(None)
StopIteration
sys:1: RuntimeWarning: coroutine 'test2' was never awaited

Description: The program first executes the test1 coroutine function. When the test1 finishes executing, it reports the StopIteration exception. This is an exception of the coroutine function execution. We can use try except to capture whether the coroutine function is executed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def test1():
print("1")
print("2")
async def test2():
print("3")
print("4")
a = test1 ()
b = test2()
try:
A.send(None) # can execute the coroutine function by calling the send method.
except StopIteration as e:
print(e.value)
# The execution of the coroutine function will throw a StopIteration exception, indicating the end of the coroutine function execution, the return value in the value
pass
try:
B.send(None) # can execute the coroutine function by calling the send method.
except StopIteration:
print(e.value)
# The execution of the coroutine function will throw a StopIteration exception, indicating the end of the coroutine function execution, the return value in the value
pass

Running the above code results in the following:

1
2
3
4
1
2
3
4

Description: The program executes the test1 function first, and then executes the test2 function after the test1 function is executed. From the point of view of the execution process, the current coroutine function is no different from the ordinary function, and the asynchronous function is not implemented. How to cross-run the coroutine function?

Cross-execution coroutine function (await)

Through the above example, we found that the definition of the coroutine function can use the async keyword, the execution function can use the send method, then how to implement the switch between the two coroutine functions? Here you need to use the await keyword to modify the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
async def test1():
print("1")
Await asyncio.sleep(1) # asyncio.sleep(1) returns a coroutine object
print("2")
async def test2():
print("3")
print("4")
a = test1 ()
b = test2()
try:
A.send(None) # can execute the coroutine function by calling the send method.
except StopIteration:
# At the end of the coroutine function execution, a StopIteration exception will be thrown, indicating the end of the coroutine function execution.
pass
try:
B.send(None) # can execute the coroutine function by calling the send method.
except StopIteration:
not

Running the above function gets the following result:

1
2
3
1
3
4

Description: The program first executes the test1 coroutine function. When it executes to await, the test1 function stops executing (blocking); then it starts executing the test2 coroutine function until test2 is executed. From the results, we can see that until the program finishes running and the test1 function is not executed (no print(“2”) is executed), how can the test1 function be executed? The coroutine function can be looped using the methods that come with asyncio.

await with blocking

Use async to define coroutine objects, and use await to suspend time-consuming operations, just like the yield in the generator, the function gives control. When the coroutine encounters await, the event loop will suspend the coroutine and execute other coroutines until other coroutines are suspended or executed, and then the next coroutine is executed. The purpose of the coroutine is to make some Time-consuming operations are asynchronous.

Note: await must be followed by an Awaitable object, or an object that implements the corresponding protocol. Check the code of the Awaitable abstract class, indicating that as long as a class implements the await method, the instance constructed by it is an Awaitable. And the Coroutine class also inherits Awaitable.

Automatic loop execution coroutine function

From the previous introduction, we know that the implementation of the coroutine function needs to use the send method, but once the coroutine function switches to other functions, the function is not continued to run, and using the sned method is not very efficient. Then how to automatically execute all the coroutine functions in the process of executing the whole program, just like multi-threading and multi-process, implicit execution rather than display through the send method to execute the function.

Event Loop Method

The above mentioned problem needs to be solved by the event loop method, that is, the asyncio.get_event_loop method, modify the above code as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
loop.run_until_complete(test1())

Running the above code results in the following:

1
2
3
4
1
3
4
2

Description: The asyncio.get_event_loop method can create an event loop, then use run_until_complete to register the coroutine to the event loop and start the event loop.

task task

Since the coroutine object cannot be run directly, when registering the event loop, the run_until_complete method actually wraps the coroutine into a task object. The so-called task object is a subclass of the Future class, which saves the state after the coroutine is run, and is used to obtain the result of the coroutine in the future. We can also manually define the coroutine object as a task, modify the above code as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
async def test1():
print("1")
await test2()
print("2")
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = loop.create_task(test1())
loop.run_until_complete(task)

Description: As mentioned earlier, the task object saves the state of the coroutine running, and can get the return value of the coroutine function running. How to get it? There are two ways to do this, one is to bind the callback function, and the other is to output directly after running the task. It is worth mentioning that if you use the send method to execute a function, the return value can be obtained by catching the StopIteration exception and using StopIteration.value.

Direct output task results

When the coroutine function finishes running, we need to get its return value. The first way is to wait for the task state to finish, and call the result method of the task to get the return value.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
loop.run_until_complete(task)
print(task.result())

Running the above code results in the following:

1
2
3
4
5
1
3
4
2
stop

Callback

The second way to get the return value is to get the result of the execution by binding the callback function. The last parameter of the callback is the future object, and the return value of the coroutine can be obtained through the object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(future):
Print('Callback:',future.result()) # Get the return value of the coroutine function by using the result method of the future object.
loop = asyncio.get_event_loop()
Task = asyncio.ensure_future(test1()) # Create task, test1() is a coroutine object
Task.add_done_callback(callback) # bind callback function
loop.run_until_complete(task)

Running the above code results in the following:

1
2
3
4
5
1
3
4
2
Callback: stop

If the callback function needs to accept multiple parameters, you can import it through a partial function, and modify the code as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import functools
async def test1():
print("1")
await test2()
print("2")
return "stop"
async def test2():
print("3")
print("4")
def callback(param1,param2,future):
print(param1,param2)
print('Callback:',future.result())
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(test1())
task.add_done_callback(functools.partial(callback,"param1","param2"))
loop.run_until_complete(task)

Description: The future object in the callback function is the task object created.

future object

The future object has several states: Pending, Running, Done, Cancelled. When creating a future, the task is pending. When the event loop is called, it is of course running. After the call is completed, it is naturally done. If you need to stop the event loop, you need to cancel the task first. You can use asyncio.Task to get the task loop.

I introduced the use of event loops to execute coroutine functions. How do I stop execution? Before stopping the execution of the coroutine, you need to cancel the task before stopping the loop event loop.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
async def test1():
print("1")
await asyncio.sleep(3)
print("2")
return "stop"
tasks = [
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
asyncio.ensure_future(test1()),
]
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
for task in asyncio.Task.all_tasks():
task.cancel()
loop.stop()
loop.run_forever()
finally:
loop.close()

Run the above code and press ctrl+c to end the execution.

Some concepts and methods used in this article

  • event_loop event loop: The program starts an infinite loop. When some functions are registered to the event loop, the corresponding function is called when the event occurrence condition is met.
  • coroutine coroutine object: refers to a function defined by the async keyword, its call does not immediately execute the function, but will return a coroutine object, the coroutine object needs to be registered to the event loop, called by the event loop.
  • task task: A coroutine object is a function that can be suspended in the original, and the task is to further encapsulate the coroutine, which contains various states of the task.
  • future: represents the result of a task that is executed or not executed in the future, and there is no essential difference between it and the task.
  • async/await keyword: python3.5 is used to define the keywords of the coroutine, async defines a coroutine, and await is used to suspend the blocking asynchronous call interface.

Concurrency and parallelism

Concurrency usually means that multiple tasks need to be performed simultaneously, while paralleling is performed by multiple tasks at the same time. With multi-threading, multi-process, and coroutine, the coroutine implements concurrency, and multi-threading and multi-process implementation are parallel.

How does asyncio coroutine achieve concurrency?

Asyncio wants to achieve concurrency, it needs multiple coroutines to complete the task, await every time there is a task blocking, then other coroutines continue to work, this needs to create a list of multiple coroutines, and then register these coroutines to In the event loop. The multiple coroutines referred to herein may be multiple coroutine functions or multiple coroutine objects of a coroutine function.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
async def test1():
print("1")
await asyncio.sleep(1)
print("2")
return "stop"
a = test1 ()
b = test1()
c = test1 ()
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop = asyncio.get_event_loop()
for task in tasks:
print("task result is ",task.result())

Running the above code results in the following:

1
2
3
4
5
6
7
8
9
1
1
1
2
2
2
task result is stop
task result is stop
task result is stop

Description: The code first defines three coroutine objects, then creates three tasks through the asyncio.ensure_future method, and adds all the tasks to the task list, and finally adds the task list to the event loop using loop.run_until_complete.

Coroutine crawler

I introduced how to create a coroutine function using async and await, and create an event loop and execute a coroutine function using asyncio.get_event_loop. The example is a good example of how efficient co-location concurrency is, but how do you develop a coroutine in a real-world scenario? For example, asynchronous crawlers. I tried to write an asynchronous crawler using the requests module and the urllib module, but the actual operation found that asyncio is not supported, so you can use the aiohttp module to write asynchronous crawlers.

aiohttpImplementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import aiohttp
async def run(url):
print("start spider ",url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Running the above code results in the following:

1
2
3
4
5
6
7
8
start spider https://thief.one
start spider https://home.nmask.cn
start spider https://movie.nmask.cn
start spider https://tool.nmask.cn
https://movie.nmask.cn
https://home.nmask.cn
https://tool.nmask.cn
https://thief.one

Description: aiohttp is based on asyncio implementation, which can be used to write webserver or as a crawler.

requestsimplement

Since the requests module blocks the only thread of the client code and the asycio event loop, the entire application is frozen when the call is executed, but if you must use the requests module, you can use the run_in_executor method of the event loop object to create a new one by using the run_in_executor method. Threads execute time-consuming functions, so you can modify the code implementation like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
import requests
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(None, requests.get, url)
print(response.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

If you want to bring parameters to the requests, you can use functools:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests
import functools
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(None,functools.partial(requests.get,url=url,params="",timeout=1))
except Exception as e:
print (e)
else:
print(response.url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Using a blocking function in asyncio

Just like how to use the requests module in asyncio as before, how do you implement this if you want to use other blocking functions in asyncio? Although there are currently asynchronous functions that support asyncio, the real problem is that most IO modules do not support asyncio.

The problem with blocking functions used in asyncio

Blocking functions (such as io read and write, requests network requests) block the only thread of the client code and the asycio event loop, so the entire application freezes when the call is executed.

solution

The solution to this problem is to use the run_in_executor method of the event loop object. The asyncio event loop maintains a ThreadPoolExecutor object behind it. We can call the run_in_executor method to send the callable object to it. That is, we can create a new thread to execute the time-consuming function through the run_in_executor method.

run_in_executor method

1
AbstractEventLoop.run_in_executor(executor, func, *args)
  • The executor parameter should be an Executor instance. If None, the default executor is used.
  • func is the function to be executed
  • args is the argument passed to func

The actual example (using time.sleep()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time
async def run(url):
print("start ",url)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None,time.sleep,1)
except Exception as e:
print (e)
print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Run the above code to get the following function:

1
2
3
4
5
6
7
8
start https://thief.one
start https://home.nmask.cn
start https://movie.nmask.cn
start https://tool.nmask.cn
stop https://thief.one
stop https://movie.nmask.cn
stop https://home.nmask.cn
stop https://tool.nmask.cn

Description: With the run_in_executor method, we can create coroutines concurrently using the familiar modules, without the need for specific modules for IO asynchronous development.

Reference

https://www.oschina.net/translate/playing-around-with-await-async-in-python-3-5
https://www.jianshu.com/p/b5e347b3a17c
https://zhuanlan.zhihu.com/p/27258289
https://juejin.im/entry/5aabb949f265da23a04951df

本文标题:Python 3.5 Coroutine Learning Study

文章作者:nmask

发布时间:2018年06月21日 - 11:06

最后更新:2019年08月16日 - 15:08

原始链接:https://thief.one/2018/06/21/1/en/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

nmask wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!

热门文章推荐: