python - aiohttp.TCPConnector (with limit argument) vs asyncio.Semaphore for limiting the number of concurrent connections -


i thought i'd learn new python async await syntax , more asyncio module making simple script allows download multiple resources @ one.

but i'm stuck.

while researching came across 2 options limit number of concurrent requests:

  1. passing aiohttp.tcpconnector (with limit argument) aiohttp.clientsession or
  2. using asyncio.semaphore.

is there preferred option or can used interchangeably if want limit number of concurrent connections? (roughly) equal in terms of performance?

also both seem have default value of 100 concurrent connections/operations. if use semaphore limit of lets 500 aiohttp internals lock me down 100 concurrent connections implicitly?

this new , unclear me. please feel free point out misunderstandings on part or flaws in code.

here code containing both options (which should remove?):

bonus questions:

  1. how handle (preferably retry x times) coros threw error?
  2. what best way save returned data (inform datahandler) coro finished? don't want saved @ end because start working results possible.

s

import asyncio tqdm import tqdm import uvloop uvloop aiohttp import clientsession, tcpconnector, basicauth  # can ignore class class dummydatahandler(datahandler):     """takes data , stores somewhere"""      def __init__(self, *args, **kwargs):         super().__init__(*args, **kwargs)      def take(self, origin_url, data):         return true      def done(self):         return none  class asyncdownloader(object):     def __init__(self, concurrent_connections=100, silent=false, data_handler=none, loop_policy=none):          self.concurrent_connections = concurrent_connections         self.silent = silent          self.data_handler = data_handler or dummydatahandler()          self.sending_bar = none         self.receiving_bar = none          asyncio.set_event_loop_policy(loop_policy or uvloop.eventlooppolicy())         self.loop = asyncio.get_event_loop()         self.semaphore = asyncio.semaphore(concurrent_connections)      async def fetch(self, session, url):         # option 1: semaphore, limiting number of concurrent coros,         # thereby limiting number of concurrent requests.         (await self.semaphore):             async session.get(url) response:                 # bonus question 1: best way retry request failed?                 resp_task = asyncio.ensure_future(response.read())                 self.sending_bar.update(1)                 resp = await resp_task                  await  response.release()                 if not self.silent:                     self.receiving_bar.update(1)                 return resp      async def batch_download(self, urls, auth=none):         # option 2: limiting number of open connections directly via tcpconnector         conn = tcpconnector(limit=self.concurrent_connections, keepalive_timeout=60)         async clientsession(connector=conn, auth=auth) session:             await asyncio.gather(*[asyncio.ensure_future(self.download_and_save(session, url)) url in urls])      async def download_and_save(self, session, url):         content_task = asyncio.ensure_future(self.fetch(session, url))         content = await content_task         # bonus question 2: blocking, know. should wrapped in coro         # or should use asyncio.as_completed in download function?         self.data_handler.take(origin_url=url, data=content)      def download(self, urls, auth=none):         if isinstance(auth, tuple):             auth = basicauth(*auth)         print('running on concurrency level {}'.format(self.concurrent_connections))         self.sending_bar = tqdm(urls, total=len(urls), desc='sent    ', unit='requests')         self.sending_bar.update(0)          self.receiving_bar = tqdm(urls, total=len(urls), desc='reveived', unit='requests')         self.receiving_bar.update(0)          tasks = self.batch_download(urls, auth)         self.loop.run_until_complete(tasks)         return self.data_handler.done()   ### call ###  url_pattern = 'https://www.example.com/{}.html'  def gen_url(lower=0, upper=none):     in range(lower, upper):         yield url_pattern.format(i)     ad = asyncdownloader(concurrent_connections=30) data = ad.download([g g in gen_url(upper=1000)]) 


Comments

Popular posts from this blog

Is there a better way to structure post methods in Class Based Views -

performance - Why is XCHG reg, reg a 3 micro-op instruction on modern Intel architectures? -

c# - Asp.net web api : redirect unauthorized requst to forbidden page -