ãã©ãã ããã£ãããšåäœããã®ããåŒã³åºãã䞊ååãããšã©ããªãã®ãïŒã
ãããã
asyncioã«åºäŒãããã¹ãŠãå€ãããŸããã
誰ãç¥ããªãå Žåãasyncioã¯ç«¶åããã°ã©ãã³ã°ã®çµç¹åãã®æ°ããã¢ãžã¥ãŒã«ã§ãããPython 3.4ã§ç»å ŽããŸããã éåæã³ãŒãã§ã³ã«ãŒãã³ãšå°æ¥ã®äœ¿çšãç°¡çŽ åããããšãç®çãšããŠããŸã-ã³ãŒã«ããã¯ãªãã§ã³ãŒããåæã®ããã«èŠããããã«ããŸãã
åœæã䌌ããããªããŒã«ãããã€ãããããã®ãã¡ã®1ã€ãéç«ã£ãŠããããšã
æãåºããŸããããã
geventã©ã€ãã©ãªã§ãã
å®è·µçãªpythonéçºè
åãã®åªãã
geventãã¥ãŒããªã¢ã«ãèªãããšããå§ãããŸãããã®
ãã¥ãŒããªã¢ã«ã§ã¯ãäœæ¥
æ¹æ³ã ãã§ãªããäžè¬ã«ç«¶åãç解ãããŠãããã®ã«ã€ããŠã
説æããŠããŸãã ç§ã¯ãã®èšäºããšãŠãæ°ã«å
¥ã£ãã®ã§ãasyncioã®æŠèŠãæžãããã®ãã³ãã¬ãŒããšããŠäœ¿çšããããšã«ããŸããã
å°ããªå
責äºé
ã¯ãgevent察asyncioã®èšäºã§ã¯ãããŸããã ãã€ãµã³ããŒãã¯åœŒã®
ã¡ã¢ã§ãã§ã«ãããããŠããŸããã
GitHubã§ãã¹ãŠã®äŸãèŠã€ããããšãã§ããŸãã
ããªãã¯ã³ãŒããæžãã®ãåŸ
ã€ããšãã§ããªãããšãç¥ã£ãŠããŸãããæåã«å°æ¥çã«ç§ãã¡ã«åœ¹ç«ã€ããã€ãã®æŠå¿µãæ€èšããããšæããŸãã
ã¹ã¬ãããã€ãã³ãã«ãŒããã³ã«ãŒãã³ãããã³å
ç©
ã¹ã¬ããã¯æãäžè¬çãªããŒã«ã§ãã èããããšããããšæããŸãããasyncioã§ã¯ãã€ãã³ããµã€ã¯ã«ãã³ã«ãŒãã³ãæªæ¥ãªã©ãä»ã®ããã€ãã®æŠå¿µã䜿çšããŠããŸãã
- ã»ãšãã©ã®å Žå ã ã€ãã³ãã«ãŒãã¯ããŸããŸãªã¿ã¹ã¯ã®å®è¡ã®ã¿ãå¶åŸ¡ããŸããåä¿¡ãç»é²ããé©åãªã¿ã€ãã³ã°ã§éå§ããŸã
- ã³ã«ãŒãã³ã¯ãPythonãžã§ãã¬ãŒã¿ãŒãšåæ§ã«ãã€ãã³ãã«ãŒãã«å¶åŸ¡ãæ»ãããšãæåŸ
ãããïŒ await ïŒç¹å¥ãªé¢æ°ã§ãã ã€ãã³ãã«ãŒããéããŠæ£ç¢ºã«èµ·åããå¿
èŠããããŸã
- å
ç©ã¯ãã¿ã¹ã¯ã®çŸåšã®çµæãä¿åããããªããžã§ã¯ãã§ãã ããã¯ãã¿ã¹ã¯ããŸã åŠçãããŠããªãããçµæããã§ã«ååŸãããŠãããšããæ
å ±ã§ãã ãŸãã¯å€åäŸå€
ãšãŠãç°¡åã§ããïŒ è¡ããïŒ
åæããã³éåæå®è¡
ã
競äºã¯äžŠè¡æ§ã§ã¯ãªããããè¯ã ããšãããããª
ã§ã¯ããããã€ã¯ãéèŠãªããšã«æ³šæãåããŠããŸãã ã¿ã¹ã¯ã競äºåã®ãããµãã¿ã¹ã¯ã«åå²ããããšã¯ã圌ããããã®ãµãã¿ã¹ã¯ã管çããŠããå Žåã«ã®ã¿ããã®ãããªäžŠååŠçã§ã®ã¿å¯èœã§ãã
Asyncioãåãã§ã-ã³ãŒããã³ã«ãŒãã³ãšããŠå®çŸ©ãããããã·ãŒãžã£ã«åå²ããããšãã§ããŸããããã«ãããåæå®è¡ãå«ããå¿
èŠã«å¿ããŠãããã管çããããšãã§ããŸãã ã³ã«ãŒãã³ã«ã¯yieldã¹ããŒãã¡ã³ããå«ãŸããŠãããããã䜿çšããŠãå®äºããã®ãåŸ
ã£ãŠããä»ã®ã¿ã¹ã¯ã«åãæ¿ããããšãã§ããå Žæã決å®ããŸãã
asyncioã§ã³ã³ããã¹ããåãæ¿ããã«ã¯ãyieldã責任ãè² ããŸããyieldã¯å¶åŸ¡ãã€ãã³ãã«ãŒãã«æ»ãã次ã«ã€ãã³ãã«ãŒãã¯å¥ã®ã³ã«ãŒãã³ã«é²ã¿ãŸãã åºæ¬çãªäŸãèããŠã¿ãŸãããïŒ
import asyncio async def foo(): print('Running in foo') await asyncio.sleep(0) print('Explicit context switch to foo again') async def bar(): print('Explicit context to bar') await asyncio.sleep(0) print('Implicit context switch back to bar') ioloop = asyncio.get_event_loop() tasks = [ioloop.create_task(foo()), ioloop.create_task(bar())] wait_tasks = asyncio.wait(tasks) ioloop.run_until_complete(wait_tasks) ioloop.close()
$ python3 1-sync-async-execution-asyncio-await.py Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
*æåã«ãasyncioããã®
ã¹ãªãŒãã䜿çšããŠãã³ããããã³ã°ã®ãµããããããã€ãã®ç°¡åãªã³ã«ãŒãã³ãçºè¡šããŸãã
*ã³ã«ãŒãã³ã¯å¥ã®ã³ã«ãŒãã³ããã®ã¿èµ·åãããã
create_taskã䜿çšããŠã¿ã¹ã¯ã«ã©ããã§ã
ãŸãã* 2ã€ã®ã¿ã¹ã¯ãå®äºãããã
waitã䜿çšããŠããããçµå
*æåŸã«ã
run_until_completeãä»ããŠã€ãã³ãã«ãŒãã«å®è¡ãéä¿¡ããŸã
ãããã£ãŠãããçš®ã®ã³ã«ãŒãã³ã§
awaitã䜿çšã
㊠ãã³ã«ãŒãã³ãã€ãã³ãã«ãŒãã«å¶åŸ¡ãæ»ãããšãã§ããããšã宣èšããŸããããã«ããã次ã®ã¿ã¹ã¯ã®ããã€ããèµ·åãããŸãã åãããšãbarã§ãèµ·ãããŸãïŒ
await asyncio.sleepã³ã³ãããŒã«ã¯ã€ãã³ãã«ãŒãã«æ»ãããé©åãªã¿ã€ãã³ã°ã§fooã«æ»ããŸãã
2ã€ã®ãããã¯ã¿ã¹ã¯ãæ³åããŠãã ãããgr1ãšgr2ã¯ãç¹å®ã®ãµãŒãããŒãã£ãµãŒãã¹ã«ã¢ã¯ã»ã¹ããŠãããã®ããã§ãããå¿çãåŸ
ã£ãŠããéã3çªç®ã®é¢æ°ã¯éåæã«åäœã§ããŸãã
import time import asyncio start = time.time() def tic(): return 'at %1.1f seconds' % (time.time() - start) async def gr1():
$ python3 1b-cooperatively-scheduled-asyncio-await.py gr1 started work: at 0.0 seconds gr2 started work: at 0.0 seconds Lets do some stuff while the coroutines are blocked, at 0.0 seconds Done! gr1 ended work: at 2.0 seconds gr2 Ended work: at 2.0 seconds
I / Oãšã¹ã±ãžã¥ãŒãªã³ã°ãã©ã®ããã«æ©èœãããã«æ³šæãæããããããã¹ãŠãåäžã®ã¹ã¬ããã«åããããšãã§ããŸãã I / OåŸ
æ©ã«ãã£ãŠ2ã€ã®ã¿ã¹ã¯ããããã¯ãããŠããéãã3çªç®ã®æ©èœã¯ãã¹ãŠã®ããã»ããµæéãå æã§ããŸãã
å®è¡é åº
åæã®äžçã§ã¯ãé çªã«èããŸãã å®äºãããŸã§ã«æéããããã¿ã¹ã¯ã®ãªã¹ããããå Žåããããã¯åŠçãããã®ãšåãé åºã§çµäºããŸãã ãããã競äºã®å Žåãããã確èªããããšã¯ã§ããŸããã
import random from time import sleep import asyncio def task(pid): """Synchronous non-deterministic task. """ sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) async def task_coro(pid): """Coroutine non-deterministic task """ await asyncio.sleep(random.randint(0, 2) * 0.001) print('Task %s done' % pid) def synchronous(): for i in range(1, 10): task(i) async def asynchronous(): tasks = [asyncio.ensure_future(task_coro(i)) for i in range(1, 10)] await asyncio.wait(tasks) print('Synchronous:') synchronous() ioloop = asyncio.get_event_loop() print('Asynchronous:') ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1c-determinism-sync-async-asyncio-await.py Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 2 done Task 5 done Task 6 done Task 8 done Task 9 done Task 1 done Task 4 done Task 3 done Task 7 done
ãã¡ãããåã¿ã¹ã¯ã¯ã©ã³ãã ã«ã¹ãªãŒãç¶æ
ã«ãªããããçµæã¯ç°ãªããŸãããåžžã«åãé åºã§ã¿ã¹ã¯ãèšå®ããŸãããå®è¡çµæã¯å®å
šã«ç°ãªãããšã«æ³šæããŠãã ããã
ãŸããããªãåçŽãªã¿ã¹ã¯ã®ã³ã«ãŒãã³ã«ã泚æããŠãã ããã asyncioã§ã¯ãéããããã³ã°ã¿ã¹ã¯ãå®è£
ããéã«éæ³ã¯ãªãããšãç解ããããšãéèŠã§ãã å®è£
äžãasyncioã¯æšæºã©ã€ãã©ãªã«åå¥ã«ååšããŸããã æ®ãã®ã¢ãžã¥ãŒã«ã¯ããããã³ã°æ©èœã®ã¿ãæäŸããŸããã
concurrent.futuresã¢ãžã¥ãŒã«ã䜿çšããŠãã¹ã¬ãããŸãã¯ããã»ã¹ã§ããããã³ã°ã¿ã¹ã¯ãã©ããããasyncioã§äœ¿çšãããã¥ãŒãã£ãŒãååŸã§ããŸãã
GitHubã«ã¯ãã®ãããªäŸ
ãããã€ã
ãããŸã ã
ããã¯ããããasyncioã䜿çšãããšãã®äž»ãªæ¬ ç¹ã§ããããã®åé¡ã®è§£æ±ºã«åœ¹ç«ã€ã©ã€ãã©ãªãæ¢ã«ããã€ããããŸãã
æãäžè¬çãªããããã³ã°ã¿ã¹ã¯ã¯ãHTTPèŠæ±ã䜿çšããŠããŒã¿ãååŸããããšã§ãã GitHubã§å
¬éã€ãã³ãã«é¢ããæ
å ±ãåä¿¡ããäŸã
䜿çšããŠãåªãã
aiohttpã©ã€ãã©ãª
ãæäœããŠ
ã¿ãŸãããã
import time import urllib.request import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 def fetch_sync(pid): print('Fetch sync process {} started'.format(pid)) start = time.time() response = urllib.request.urlopen(URL) datetime = response.getheader('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) return datetime async def fetch_async(pid): print('Fetch async process {} started'.format(pid)) start = time.time() response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') print('Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start)) response.close() return datetime def synchronous(): start = time.time() for i in range(1, MAX_CLIENTS + 1): fetch_sync(i) print("Process took: {:.2f} seconds".format(time.time() - start)) async def asynchronous(): start = time.time() tasks = [asyncio.ensure_future( fetch_async(i)) for i in range(1, MAX_CLIENTS + 1)] await asyncio.wait(tasks) print("Process took: {:.2f} seconds".format(time.time() - start)) print('Synchronous:') synchronous() print('Asynchronous:') ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 1d-async-fetch-from-server-asyncio-await.py Synchronous: Fetch sync process 1 started Process 1: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.54 seconds Fetch sync process 2 started Process 2: Wed, 17 Feb 2016 13:10:11 GMT, took: 0.50 seconds Fetch sync process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.48 seconds Process took: 1.54 seconds Asynchronous: Fetch async process 1 started Fetch async process 2 started Fetch async process 3 started Process 3: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.50 seconds Process 2: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.52 seconds Process 1: Wed, 17 Feb 2016 13:10:12 GMT, took: 0.54 seconds Process took: 0.54 seconds
ããã§ã¯ãããã€ãã®ç¹ã«æ³šæãã䟡å€ããããŸãã
ãŸããæéå·®-éåæåŒã³åºãã䜿çšããå Žåããªã¯ãšã¹ããåæã«å®è¡ããŸãã åã«è¿°ã¹ãããã«ããããããå¶åŸ¡ã次ãžç§»ããå®äºæã«çµæãè¿ããŸããã ã€ãŸããå®è¡é床ã¯0.54ç§ã ãããã£ãæãé
ãèŠæ±ã®å®è¡æéã«çŽæ¥äŸåããŸãã ããã§ãã
第äºã«ãã³ãŒããã©ãã ãåæçã«èŠãããã ããã¯æ¬è³ªçã«åãããšã§ãïŒ äž»ãªéãã¯ãã¯ãšãªã®å®è¡ãäœæãããã³ã¿ã¹ã¯ã®å®äºãåŸ
æ©ããããã®ã©ã€ãã©ãªã®å®è£
ã«é¢é£ããŠããŸãã
競äºåã®åµåº
ãããŸã§ãã³ã«ãŒãã³ããçµæãäœæããã³ååŸããäžé£ã®ã¿ã¹ã¯ãäœæããŠãã®å®äºãåŸ
æ©ããããã®å¯äžã®æ¹æ³ã䜿çšããŠããŸããã ãã ããããã€ãã®æ¹æ³ã§ã³ã«ãŒãã³ãå®è¡ããŠçµæãçæããããã«èšç»ã§ããŸãã GETãªã¯ãšã¹ãã®çµæãåãåã£ããšãã«åŠçããå¿
èŠãããç¶æ³ãæ³åããŠãã ããã å®éãå®è£
ã¯åã®ãã®ãšéåžžã«äŒŒãŠããŸãïŒ
import time import random import asyncio import aiohttp URL = 'https://api.github.com/events' MAX_CLIENTS = 3 async def fetch_async(pid): start = time.time() sleepy_time = random.randint(2, 5) print('Fetch async process {} started, sleeping for {} seconds'.format( pid, sleepy_time)) await asyncio.sleep(sleepy_time) response = await aiohttp.request('GET', URL) datetime = response.headers.get('Date') response.close() return 'Process {}: {}, took: {:.2f} seconds'.format( pid, datetime, time.time() - start) async def asynchronous(): start = time.time() futures = [fetch_async(i) for i in range(1, MAX_CLIENTS + 1)] for i, future in enumerate(asyncio.as_completed(futures)): result = await future print('{} {}'.format(">>" * (i + 1), result)) print("Process took: {:.2f} seconds".format(time.time() - start)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2a-async-fetch-from-server-as-completed-asyncio-await.py Fetch async process 1 started, sleeping for 4 seconds Fetch async process 3 started, sleeping for 5 seconds Fetch async process 2 started, sleeping for 3 seconds >> Process 2: Wed, 17 Feb 2016 13:55:19 GMT, took: 3.53 seconds >>>> Process 1: Wed, 17 Feb 2016 13:55:20 GMT, took: 4.49 seconds >>>>>> Process 3: Wed, 17 Feb 2016 13:55:21 GMT, took: 5.48 seconds Process took: 5.48 seconds
ã€ã³ãã³ããšã¿ã€ãã³ã°ãèŠãŠãã ãã-ãã¹ãŠã®ã¿ã¹ã¯ãåæã«èµ·åããŸããããå®äºé ã«åŠçãããŸããã ãã®å Žåã®ã³ãŒãã¯ãããã«ç°ãªããŸããã³ã«ãŒãã³ããªã¹ãã«ããã¯ããŸããåã³ã«ãŒãã³ã¯å®è¡ã®ããã«ãã§ã«æºåãããŠããŸãã
as_completedé¢æ°ã¯ãå®è¡æã«
ã³ã«ãŒãã³ã®çµæãè¿ãå埩åãè¿ããŸãã æ¬åœã«ãã£ãããïŒïŒ ãšããã§ã
as_completedãš
waitã¯ã
concurrent.futuresããã±ãŒãžã®é¢æ°ã§ãã
ãã1ã€ã®äŸã¯ãIPã¢ãã¬ã¹ãç¥ãããå Žåã§ãã ããã«ã¯å€ãã®ãµãŒãã¹ããããŸãããããã°ã©ã ã®æç¹ã§ã©ã®ãµãŒãã¹ãå©çšå¯èœã«ãªããã¯ããããŸããã åãªã¹ããé çªã«ããŒãªã³ã°ãã代ããã«ããã¹ãŠã®ã¯ãšãªã競åããŠå®è¡ããæåã«æåããã¯ãšãªãéžæã§ããŸãã
ããŠããã®ããã«ãç§ãã¡ã®ãæ°ã«å
¥ãã®é¢æ°
waitã«ã¯ç¹å¥ãªãã©ã¡ãŒã¿ãŒ
return_whenããããŸãã ãããŸã§ã
åŸ
æ©ãè¿ããã®ãç¡èŠããŸããã 䞊ååãããã¿ã¹ã¯ã®ã¿ã ããããã³ã«ãŒãã³ããçµæãååŸããå¿
èŠããããããå®äºæžã¿ããã³ä¿çäžã®å
ç©ã®ã»ããã䜿çšããŸãã
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-await.py Fetching IP from ip-api Fetching IP from ipify ip-api finished with result: 82.34.76.170, took: 0.09 seconds Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x10f95c6d8> Task was destroyed but it is pending! task: <Task pending coro=<fetch_ip() running at 2c-fetch-first-ip-address-response.py:20> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(10)(), Task._wakeup()]>>
ã©ãããã®ïŒ æåã®ãµãŒãã¹ã¯æ£åžžã«å¿çããŸãããããã°ã«èŠåããããŸããïŒ
å®éã2ã€ã®ã¿ã¹ã¯ã®å®è¡ãéå§ããŸããããæåã®çµæã®åŸã«ãµã€ã¯ã«ãæ®ãã2çªç®ã®ã³ã«ãŒãã³ã¯ãŸã å®è¡ãããŠããŸããã Asyncioã¯ãã°ã ãšèããèŠåããŠãããŸããã ããããããªãèªèº«ã®ããã«ã¯ãªãŒã³ã¢ãããã䟡å€ããããæããã«äžèŠãªã¿ã¹ã¯ã殺ããŸãã ã©ããã£ãŠïŒ èããŠãããŠããããã
æªæ¥å·
- ä¿çäž
- å®è¡ïŒå®è¡äžïŒ
- ãã£ã
- ãã£ã³ã»ã«æžã¿ïŒãã£ã³ã»ã«æžã¿ïŒ
ãã¹ãŠããšãŠãã·ã³ãã«ã§ãã futurãdoneç¶æ
ã«ãªã£ãããããããå®è¡çµæãååŸã§ããŸãã ä¿çäžããã³å®è¡äžã®ç¶æ
ã§ã¯ããã®ãããªæäœã¯
InvalidStateErroräŸå€ãã¹ããŒããŸããcanelledã®å Žåã
CancelledErrorãã¹ããŒãããæåŸã«ãã³ã«ãŒãã³èªäœã§äŸå€ãçºçããå Žåã¯ã
äŸå€ãåŒã³åºããããšããšåãããã«åã³ã¹ããŒ
ãããŸãã
ããããç§ã®èšèãä¿¡ããªãã§ãã ãã ã
done ã
canceledããŸãã¯
runningã¡ãœããã䜿çšããŠæªæ¥ã®ç¶æ
ãèŠã€ããããšãã§ããŸããã
doneã®å Žåã
resultãåŒã³åºããšãæåŸ
ãããçµæãšæäœäžã«çºçããäŸå€ã®äž¡æ¹ãè¿ãããããšãå¿ããªãã§ãã ããã å
ç©
ã®å®è¡ã
ãã£ã³ã»ã«ãã
ãã£ã³ã»ã«ã¡ãœããããããŸã ããã¯äŸãä¿®æ£ããã®ã«é©ããŠããŸãã
from collections import namedtuple import time import asyncio from concurrent.futures import FIRST_COMPLETED import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) response = await aiohttp.request('GET', service.url) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, return_when=FIRST_COMPLETED) print(done.pop().result()) for future in pending: future.cancel() ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 2c-fetch-first-ip-address-response-no-warning-await.py Fetching IP from ipify Fetching IP from ip-api ip-api finished with result: 82.34.76.170, took: 0.08 seconds
ã·ã³ãã«ã§æ£ç¢ºãªçµè«ã¯ãç§ã倧奜ããªãã®ã§ãïŒ
futureãåŠçããããã«è¿œå ã®ããžãã¯ãå¿
èŠãªå Žåã¯ãç¶æ
ãå®äºãããšãã«åŒã³åºãããã³ãŒã«ããã¯ãæ¥ç¶ã§ããŸãã ããã¯ãäžéšã®çµæããã®å€ã§åå®çŸ©ããå¿
èŠããããã¹ãã§åœ¹ç«ã¡ãŸãã
äŸå€åŠç
asyncioã¯ã管çãããèªã¿åãå¯èœãªåæå®è¡ã³ãŒãã®èšè¿°ã«é¢ãããã®ã§ãäŸå€ãåŠçããéã«éåžžã«é¡èã§ãã äŸã«æ»ã£ãŠèª¬æããŸãã
IPå®çŸ©ã«ãããµãŒãã¹ã«å¯Ÿãããã¹ãŠã®èŠæ±ãåãçµæãè¿ãããšã確èªããããšããŸãã ãã ãããã®ãã¡ã®1ã€ã¯ãªãã©ã€ã³ã§ãããå¿çããªãå ŽåããããŸãã tryãé©çšããã ãã§ã...ãã€ãã®å Žåãé€ããŸãïŒ
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: print(future.result()) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3a-fetch-ip-addresses-fail-await.py Fetching IP from ip-api Fetching IP from borken Fetching IP from ipify ip-api finished with result: 85.133.69.250, took: 0.75 seconds ipify finished with result: 85.133.69.250, took: 1.37 seconds borken is unresponsive
ã³ã«ãŒãã³ã®å®è¡äžã«çºçããäŸå€ãåŠçã§ããŸãã
from collections import namedtuple import time import asyncio import aiohttp import traceback Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() return '{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] done, _ = await asyncio.wait(futures) for future in done: try: print(future.result()) except: print("Unexpected error: {}".format(traceback.format_exc())) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous()) ioloop.close()
$ python3 3b-fetch-ip-addresses-future-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api ipify finished with result: 85.133.69.250, took: 0.91 seconds borken is unresponsive Unexpected error: Traceback (most recent call last): File â3b-fetch-ip-addresses-future-exceptions.pyâ, line 39, in asynchronous print(future.result()) File â3b-fetch-ip-addresses-future-exceptions.pyâ, line 26, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr'
å®äºãåŸ
ããã«ã¿ã¹ã¯ãéå§ããã®ã¯ééãã§ããããã«ãæªç¥ã®äŸå€ãååŸãããšãåºåã«ãã¬ãŒã¹ãæ®ããŸãã
from collections import namedtuple import time import asyncio import aiohttp Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'this-is-not-an-attr'), Service('borken', 'http://no-way-this-is-going-to-work.com/json', 'ip') ) async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) try: response = await aiohttp.request('GET', service.url) except: print('{} is unresponsive'.format(service.name)) else: json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) async def asynchronous(): futures = [fetch_ip(service) for service in SERVICES] await asyncio.wait(futures)
$ python3 3c-fetch-ip-addresses-ignore-exceptions-await.py Fetching IP from ipify Fetching IP from borken Fetching IP from ip-api borken is unresponsive ipify finished with result: 85.133.69.250, took: 0.78 seconds Task exception was never retrieved future: <Task finished coro=<fetch_ip() done, defined at 3c-fetch-ip-addresses-ignore-exceptions.py:15> exception=KeyError('this-is-not-an-attr',)> Traceback (most recent call last): File â3c-fetch-ip-addresses-ignore-exceptions.pyâ, line 25, in fetch_ip ip = json_response[service.ip_attr] KeyError: 'this-is-not-an-attr'
åºåã¯ãasyncioããã®éé£ã¡ãã»ãŒãžãé€ããŠãåã®äŸãšåãã«èŠããŸãã
ã¿ã€ã ã¢ãŠã
ããããIPã«é¢ããæ
å ±ãããã»ã©éèŠã§ãªãå Žåã¯ã©ãã§ããããïŒ ããã¯ããã®éšåããªãã·ã§ã³ã«ãªããããªããçš®ã®è€åçãªåçãè£å®ãããã®ã§ãã ãã®å ŽåããŠãŒã¶ãŒãåŸ
ãããŸããã çæ³çã«ã¯ãã¿ã€ã ã¢ãŠããèšå®ããŠIPãèšç®ãããã®åŸããã®æ
å ±ããªããŠãããŠãŒã¶ãŒã«çããè¿ããŸãã
ç¹°ãè¿ããŸããã
åŸ
æ©ã«ã¯é©åãªåŒæ°ããããŸãã
import time import random import asyncio import aiohttp import argparse from collections import namedtuple from concurrent.futures import FIRST_COMPLETED Service = namedtuple('Service', ('name', 'url', 'ip_attr')) SERVICES = ( Service('ipify', 'https://api.ipify.org?format=json', 'ip'), Service('ip-api', 'http://ip-api.com/json', 'query'), ) DEFAULT_TIMEOUT = 0.01 async def fetch_ip(service): start = time.time() print('Fetching IP from {}'.format(service.name)) await asyncio.sleep(random.randint(1, 3) * 0.1) try: response = await aiohttp.request('GET', service.url) except: return '{} is unresponsive'.format(service.name) json_response = await response.json() ip = json_response[service.ip_attr] response.close() print('{} finished with result: {}, took: {:.2f} seconds'.format( service.name, ip, time.time() - start)) return ip async def asynchronous(timeout): response = { "message": "Result from asynchronous.", "ip": "not available" } futures = [fetch_ip(service) for service in SERVICES] done, pending = await asyncio.wait( futures, timeout=timeout, return_when=FIRST_COMPLETED) for future in pending: future.cancel() for future in done: response["ip"] = future.result() print(response) parser = argparse.ArgumentParser() parser.add_argument( '-t', '--timeout', help='Timeout to use, defaults to {}'.format(DEFAULT_TIMEOUT), default=DEFAULT_TIMEOUT, type=float) args = parser.parse_args() print("Using a {} timeout".format(args.timeout)) ioloop = asyncio.get_event_loop() ioloop.run_until_complete(asynchronous(args.timeout)) ioloop.close()
ãŸããã¹ã¯ãªããã®èµ·åè¡ã«ã¿ã€ã ã¢ãŠãåŒæ°ãè¿œå ããŠããªã¯ãšã¹ãã«åŠçæéãããå Žåã«äœãèµ·ãããã確èªããŸããã ãŸããã¹ã¯ãªãããããã«çµäºããã®ãé²ãããã«ãã©ã³ãã ãªé
延ãè¿œå ããŸããããããŠããããæ£ç¢ºã«ã©ã®ããã«æ©èœããããç解ããæãæ¥ãŸããã
$ python 4a-timeout-with-wait-kwarg-await.py Using a 0.01 timeout Fetching IP from ipify Fetching IP from ip-api {'message': 'Result from asynchronous.', 'ip': 'not available'}
$ python 4a-timeout-with-wait-kwarg-await.py -t 5 Using a 5.0 timeout Fetching IP from ip-api Fetching IP from ipify ipify finished with result: 82.34.76.170, took: 1.24 seconds {'ip': '82.34.76.170', 'message': 'Result from asynchronous.'}
ãããã«
Asyncioã¯ãç§ã®Pythonãžã®æ¢ã«å€§ããªæã匷åããŸããã æ£çŽã«èšããšãTornadoã§ã³ã«ãŒãã³ã«åºäŒã£ããšããç§ã¯ã³ã«ãŒãã³ã«æãããŸããããasyncioã¯ç«¶äºåãå®è£
ããããã«ã³ã«ãŒãã³ãšä»ã®ã©ã€ãã©ãªããæé«ã®çµæãåŸãŸããã ãããŠãäž»ãªI / Oãµã€ã¯ã«ã䜿çšã§ããããã«ç¹å¥ãªåªåãæãããŸããã ãã®ããã
TornadoãŸãã¯
Twistedã䜿çšããå Žåãasyncioçšã«èšèšãããã³ãŒããå«ããããšãã§ããŸãã
åè¿°ããããã«ãäž»ãªåé¡ã¯ãæšæºã©ã€ãã©ãªãéããããã³ã°åäœããŸã ãµããŒãããŠããªãããšã§ãã ãŸãããããŸã§ã®ãšãããå€ãã®äžè¬çãªã©ã€ãã©ãªã¯åæã¹ã¿ã€ã«ã§ã®ã¿åäœãã競äºåã䜿çšããã©ã€ãã©ãªã¯ãŸã è¥ããŠå®éšçã§ãã
ãã ãããã®æ°ã¯å¢ãç¶ããŠããŸãã
ãã®ãã¥ãŒããªã¢ã«ã§ãasyncioã䜿çšããããšã®ãã°ãããããèŠãããããšæããŸããäœããã®çç±ã§python 2.7ã§åããªããªã£ãå Žåããã®ãã¯ãããžãŒã«ãã£ãŠpython 3ã«åãæ¿ããããšãã§ããŸãã 確ããªããšã¯1ã€ã§ããPythonã®æªæ¥ã¯å®å
šã«å€ãããŸããã
翻蚳è
ããïŒå
ã®èšäºã¯2016幎2æ20æ¥ã«å
¬éãããŸãããããã®éã«å€ãã®ããšãèµ·ãããŸããã Python 3.6ããªãªãŒã¹ãããæé©åã«å ããŠãasyncioãæ¹åãããAPIãå®å®ç¶æ
ã«ãªããŸããã PostgresãRedisãElasticsearchãªã©ãæäœããããã®ã©ã€ãã©ãªãéãããã¯ã¢ãŒãã§ãªãªãŒã¹ãããŸããã æ°ãããã¬ãŒã ã¯ãŒã¯ã§ããSanicã§ãFlaskã«äŒŒãŠããŸãããéåæã¢ãŒãã§åäœããŸãã æçµçã«ãã€ãã³ãã«ãŒãã§ããCythonã§æé©åããã³æžãæãããã2åé«éã«ãªããŸããã ãããã£ãŠããã®ãã¯ãããžãŒãç¡èŠããçç±ã¯ãããŸããïŒ