python 异步 1.1 事件循环 理解为一个死循环,检测并执行某些代码。
1 2 3 4 5 import asyncioloop = asyncio.get_event_loop() loop.run_until_complete(任务)
1.2 快速上手 协程函数,定义函数使用 async def 关键字
协程对象,执行协程函数得到的协程对象
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环
1 2 3 4 5 6 loop = asyncio.get_event_loop() loop.run_until_complete(result) asyncio.run(result)
1.3 await 关键字 await 可等待的对象(协程对象,Future 对象,task 对象 -> io 等待)
1 2 3 4 5 6 7 8 import asyncioasync def func (): ... response = await asyncio.sleep(2 ) print (response) result = func() asyncio.run(result)
1 2 3 4 5 6 7 8 9 10 11 12 import asyncioasync def others (): print ("start" ) await asyncio.sleep(2 ) print ("end" ) return 'return' async def func (): print ("code inside coroutine func" ) response = awat others() print ("io req over" ) asyncio.run(func())
等待对象的值得到结果之后再往下进行
1.4 task 对象 在事件循环中添加多个任务。
用于并发调度协程,通过asyncio.create_task(协程对象)
‘的方式创建 task 对象,这样可以让协程加入事件循环中等待被调度执行。除了使用create_task
函数之外,还可用低层级的loop.create_task()
或ensure_future()
函数,不建议手动实例化 task 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asyncioasync def func (): print (1 ) await asyncio.sleep(1 ) print (2 ) return "return" async def main (): print ("main开始" ) task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) print ("main结束" ) ret1 = await task1 ret2 = await task2 print (task1, task2) asyncio.run(main())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 async def main (): print ("main开始" ) task_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ] print ("main结束" ) done, pending = await asyncio.wait(task_list, timeout=2 ) print (done) asyncio.run(main()) ''' 1 1 2 2 {<Task finished name='Task-3' coro=<func() done, defined at /home/naii/pypy/coroutine.py:4> result='return'>, <Task finished name='Task-2' coro=<func() done, defined at /home/naii/pypy/coroutine.py:4> result='return'>} '''
1 2 asyncio.create_task(func(), name='n1' )
task 对象可将协程对象立即放入事件循环中
1.5 asyncio future 对象 是 task 类的基类,task 对象内部 await 结果的处理基于 future 对象来的
1 2 3 4 5 6 7 8 9 10 11 import asyncioasync def func (): loop = asyncio.get_running_loop() fut = loop.create_future() await fut asyncio.run(func())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def set_after (fut ): await asyncio.sleep(2 ) fut.set_result('666' ) async def main (): loop = asyncio.get_event_loop() fut = loop.create_future() await loop.create_task(set_after(fut)) data = await fut print (data) asyncio.run(main())
1.6 concurrent.futures.future 对象 使用线程池、进程池来实现异步操作时用到的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutordef func (value ): time.sleep(1 ) print (value) pool = ThreadPoolExecutor(max_workers=5 ) for i in range (10 ): fut = pool.submit(func, i) print (fut)
例如:crm 项目内部数据都是基于协程的+mysql(不支持)此时使用两者交叉使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import timeimport asyncioimport concurrent.futuresdef func (): time.sleep(2 ) return "aa" async def main (): loop = asyncio.get_running_loop() fut = loop.run_in_executor(None , func) result = await fut print ('default thread pool' , result) asyncio.run(main())
案例:asyncio + 不支持异步的模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asynciofrom pip import mainimport requestsasync def download (url ): print ("开始下载" , url) loop = asyncio.get_event_loop() future = loop.run_in_executor(None , requests.get, url) response = await future print ("下载完成" ) file_name = url.rsplit('_' )[-1 ] with open (file_name, mode='wb' ) as f: f.write(response.content) if __name__ == '__main__' : url_list = [...] tasks = [download(url) for url in url_list] loop = asyncio.get_event_loop()
1.7 异步迭代器 什么是异步迭代器
实现了__aiter__()
和__anext__()
方法的对象,__anext__
必须返回一个 awaitable 对象,asyncfor 会处理异步迭代器的`_anext `()方法所返回的可等待对象,直至其引发一个 StopAsyncIteratin 异常
什么是异步可迭代对象
可在 asyncfor 语句中被使用的对象,必须通过他的`_aiter ()`方法返回一个 asyncchronous_iterator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import asyncioclass Reader (object ): def __init__ (self ) -> None : self .count = 0 async def readline (self ): self .count += 1 if self .count == 100 : return None return self .count def __aiter__ (self ): return self async def __anext__ (self ): val = await self .readline() if val == None : raise StopAsyncIteration return val async def func (): obj = Reader() async for item in obj: print (item) asyncio.run(func())
1.8 异步上下文管理 这种对象通过定义__aenter__()
和__aexit__()
方法来对async_with
语句中的环境进行控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioclass AsyncContextManager : def __init__ (self ) -> None : self .conn = conn async def do_something (self ): return 666 async def __aenter__ (self ): self .conn = await asyncio.sleep(1 ) return self async def __aexit__ (self, exc_type, exc, tb ): await asyncio.sleep(1 ) async def func (): obj = AsyncContextManager() async with obj as f: result = await f.do_something() print (result) asyncio.run(func())
1.9 uvloop 是 asyncio 事件循环的替代方案。uvloop 事件循环的效率>默认 asyncio 的事件循环。
安装
1 2 3 4 5 6 7 import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run(...)
注意:一个 asgi -> uvicorn
django3 fastapi 内部使用 uvloop
实战 2.1 异步操作 redis 两台主机之间,操作 redis 时存在链接/操作/断开都是网络 io
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioimport aioredisasync def executeimport asyncioimport aioredisasync def execute (address, password ): print ('开始执行' , address) redis = await aioredis.create_redis(address, password=password) await redis.hmset_dict('car' , key1=1 , key2=2 , key3=3 ) result = await redis.hgetall("car" , encoding='utf-8' ) print (resut) redis.close() await redis.wait_closed() asyncio.run( execute('...' , '...' ) )
1 2 3 4 5 task_list = [ execute(..., ...), execute(..., ...) ] asyncio.run(asyncio.wait(task_list))
2.2 异步操作 mysql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport aiomysqlasync def execute (): conn = await aiomysql.connect(host="127.0.0.1" , port=3306 , user="root" , password="123" , db='mysql' ) cur = await conn.cursor() await cur.execute("select Host.User from user" ) result = cur.fetchall() print (result) await cur.close() conn.close() asyncio.run(execute())
链接多个 mysql
1 2 3 4 5 task_list = [ execute(..., ...), execute(..., ...) ] asyncio.run(asyncio.wait(task_list))
2.3 fastapi 框架异步 安装
1 2 pip install fastapi pip install uvicorn # asgi
示例:
1 2 3 4 5 6 7 8 9 10 11 12 import uvicornfrom fastapi import FastAPIapp = FastAPI() @app.get("/" ) def index (): return {"message" : "hello" } uvicorn.run("luffy:app" , host="127.0.0.1" , port=5000 , log_level="info" )
1 2 3 4 5 6 7 8 @app.get("/read" ) async def read (): print ("收到请求" ) await asyncio.sleep(3 ) return 666
redis 链接池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 REDIS_POOL = aioredis.ConnectionsPoll("redis://../" , password="" , minsize=1 , maxsize=10 ) @app.get('/redis' ) async def redis_read (): conn = await REDIS_POOL.acquire() redis = Redis(conn) await redis.hmset_dict('car' , key1=1 , key2=2 ) result = await redis.hgetall('car' , encoding='utf-8' ) print (redult) REDIS_POOL.release(conn) return result
2.4 异步爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import aiohttpimport asyncioasync def fetch (session, url ): print ("发送请求" , url) async with session.get(url, verfy_ssl=False ) as response: text = await response.text() print ("得到结果" , url, len (text)) async def main (): async with aiohttp.ClientSession() as session: url_list = [ ..., ... ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] done, pending = await asyncio.wait(tasks) asyncio.run(main())