Question
I have the following test code:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
I need to use the concurrent.futures.ThreadPoolExecutor
part of the code in
a FastAPI endpoint.
My concern is the impact of the number of API calls and the inclusion of threads. Concern about creating too many threads and its related consequences, starving the host, crashing the application and/or the host.
Any thoughts or gotchas on this approach?
Answer
You should rather use the HTTPX
library,
which provides an async
API. As
described in this answer ,
you spawn a Client
and reuse it every time you need it. To make
[asynchronous requests with HTTPX
](https://www.python-
httpx.org/async/#making-async-requests), you'll need an
AsyncClient
.
You could control the connection pool size as well, using the limits
keyword
argument on the Client
, which takes an instance of
httpx.Limits
.
For example:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.AsyncClient(limits=limits)
You can adjust the above per your needs. As per the documentation on [Pool limit configuration](https://www.python-httpx.org/advanced/#pool-limit- configuration):
max_keepalive_connections
, number of allowable keep-alive connections, orNone
to always allow. (Defaults 20 )max_connections
, maximum number of allowable connections, orNone
for no limits. (Default 100 )keepalive_expiry
, time limit on idle keep-alive connections in seconds, orNone
for no limits. (Default 5 )
If you would like to [adjust the timeout](https://www.python-
httpx.org/advanced/#timeout-configuration) as well, you can use the timeout
paramter to set timeout on an individual request, or on a
Client
/AsyncClient
instance, which results in the given timeout being used
as the default for requests made with this client (see the implementation of
Timeout
class as well). You can specify the timeout behavior in a fine grained detail;
for example, setting the read
timeout parameter will specify the maximum
duration to wait for a chunk of data to be received (i.e., a chunk of the
response body). If HTTPX
is unable to receive data within this time frame, a
ReadTimeout
exception is raised. If set to None
instead of some positive
numerical value, there will be no timeout
on read
. The default is 5
seconds timeout
on all operations.
You can use await client.aclose()
to [explicitly close the
AsyncClient
](https://www.python-httpx.org/async/#opening-and-closing-
clients) when you are done with it (this could be done inside a shutdown
event handler,
for instance).
To run multiple asynchronous operations —as you need to request five
different URLs, when your API endpoint is called—you can use the awaitable
[asyncio.gather()
](https://docs.python.org/3/library/asyncio-
task.html#asyncio.gather). It will execute the async
operations and return a
list of results in the same order the
awaitables
(tasks
) were passed to that function.
Working Example:
from fastapi import FastAPI
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
client = httpx.AsyncClient(limits=limits, timeout=timeout)
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()
async def send(url, client):
return await client.get(url)
@app.get('/')
async def main():
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
# for demo purposes, return only the first 50 chars of each response
return [r.text[:50] for r in responses]
If you would like to avoid reading the entire response body into RAM , you could use [Streaming responses](https://www.python-httpx.org/async/#streaming- responses), as described in this answer and demonstrated below:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import httpx
import asyncio
URLS = ['https://www.foxnews.com/',
'https://edition.cnn.com/',
'https://www.nbcnews.com/',
'https://www.bbc.co.uk/',
'https://www.reuters.com/']
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
timeout = httpx.Timeout(5.0, read=15.0) # 15s timeout on read. 5s timeout elsewhere.
client = httpx.AsyncClient(limits=limits, timeout=timeout)
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()
async def send(url, client):
req = client.build_request('GET', url)
return await client.send(req, stream=True)
async def iter_content(responses):
for r in responses:
async for chunk in r.aiter_text():
# for demo purposes, return only the first 50 chars of each response
yield chunk[:50]
yield '\n\n'
break
await r.aclose()
@app.get('/')
async def main():
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
return StreamingResponse(iter_content(responses), media_type='text/event-stream')