python - aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore for limiting the number of concurrent connections -
i thought i'd learn new python async await syntax , more asyncio module making simple script allows download multiple resources @ one.
but i'm stuck.
while researching came across 2 options limit number of concurrent requests:
- passing aiohttp.tcpconnector (with limit argument) aiohttp.clientsession or
- using asyncio.semaphore.
is there preferred option or can used interchangeably if want limit number of concurrent connections? (roughly) equal in terms of performance?
also both seem have default value of 100 concurrent connections/operations. if use semaphore limit of lets 500 aiohttp internals lock me down 100 concurrent connections implicitly?
this new , unclear me. please feel free point out misunderstandings on part or flaws in code.
here code containing both options (which should remove?):
bonus questions:
- how handle (preferably retry x times) coros threw error?
- what best way save returned data (inform datahandler) coro finished? don't want saved @ end because start working results possible.
s
import asyncio tqdm import tqdm import uvloop uvloop aiohttp import clientsession, tcpconnector, basicauth # can ignore class class dummydatahandler(datahandler): """takes data , stores somewhere""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def take(self, origin_url, data): return true def done(self): return none class asyncdownloader(object): def __init__(self, concurrent_connections=100, silent=false, data_handler=none, loop_policy=none): self.concurrent_connections = concurrent_connections self.silent = silent self.data_handler = data_handler or dummydatahandler() self.sending_bar = none self.receiving_bar = none asyncio.set_event_loop_policy(loop_policy or uvloop.eventlooppolicy()) self.loop = asyncio.get_event_loop() self.semaphore = asyncio.semaphore(concurrent_connections) async def fetch(self, session, url): # option 1: semaphore, limiting number of concurrent coros, # thereby limiting number of concurrent requests. (await self.semaphore): async session.get(url) response: # bonus question 1: best way retry request failed? resp_task = asyncio.ensure_future(response.read()) self.sending_bar.update(1) resp = await resp_task await response.release() if not self.silent: self.receiving_bar.update(1) return resp async def batch_download(self, urls, auth=none): # option 2: limiting number of open connections directly via tcpconnector conn = tcpconnector(limit=self.concurrent_connections, keepalive_timeout=60) async clientsession(connector=conn, auth=auth) session: await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) url in urls]) async def download_and_save(self, session, url): content_task = asyncio.ensure_future(self.fetch(session, url)) content = await content_task # bonus question 2: blocking, know. should wrapped in coro # or should use asyncio.as_completed in download function? self.data_handler.take(origin_url=url, data=content) def download(self, urls, auth=none): if isinstance(auth, tuple): auth = basicauth(*auth) print('running on concurrency level {}'.format(self.concurrent_connections)) self.sending_bar = tqdm(urls, total=len(urls), desc='sent ', unit='requests') self.sending_bar.update(0) self.receiving_bar = tqdm(urls, total=len(urls), desc='reveived', unit='requests') self.receiving_bar.update(0) tasks = self.batch_download(urls, auth) self.loop.run_until_complete(tasks) return self.data_handler.done() ### call ### url_pattern = 'https://www.example.com/{}.html' def gen_url(lower=0, upper=none): in range(lower, upper): yield url_pattern.format(i) ad = asyncdownloader(concurrent_connections=30) data = ad.download([g g in gen_url(upper=1000)])
Comments
Post a Comment