Question
I have an API endpoint (FastAPI / Uvicorn). Among other things, it makes a request to yet another API for information. When I load my API with multiple concurrent requests, I begin to receive the following error:
h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
In a normal environment, I would take advantage of request.session
, but I
understand it not to be fully thread safe.
Thus, what is the proper approach to using requests within a framework such as
FastAPI, where multiple threads would be using the requests
library at the
same time?
Answer
Instead of using requests
, you could use [httpx
](https://www.python-
httpx.org), which offers an async
API
as well (httpx
is also suggested in FastAPI's
documentation when
performing async
tests, as well as FastAPI/Starlette recently replaced the
HTTP client on TestClient
from requests
to
httpx
).
The below example is based on the one given in httpx
documentation,
demonstrating how to use the library for making an asynchronous HTTP(s)
request, and subsequently, streaming the response back to the client. The
[httpx.AsyncClient()
](https://www.python-httpx.org/async/#making-async-
requests) is what you can use instead of requests.Session()
, which is useful
when several requests are being made to the same host, as the underlying TCP
connection will be reused, instead of recreating one for every single
request—hence, resulting in a significant performance improvement.
Additionally, it allows you to reuse headers
and other settings (such as
proxies
and timeout
), as well as persist cookies
, across requests. You
spawn a Client
and reuse it every time you need it. You can use await client.aclose()
to [explicitly close the client](https://www.python-
httpx.org/async/#opening-and-closing-clients) once you are done with it (you
could do that inside a shutdown
event handler).
Examples and more details can also be found in this
answer.
Example
from fastapi import FastAPI
import httpx
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
client = httpx.AsyncClient()
app = FastAPI()
@app.on_event('shutdown')
async def shutdown_event():
await client.aclose()
@app.get('/')
async def home():
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
Example (Updated)
Since [startup
and shutdown
have now been
deprecated](https://fastapi.tiangolo.com/advanced/events/#alternative-events-
deprecated) (and might be completely removed in the future), you could instead
use a lifespan
handler to initialise
the httpx
Client, as well as close the Client instance on shutdown, similar
to what has been demonstrated in this
answer. Starlette specifically
provides an example using a lifespan
handler and httpx
Client in their
documentation page. As described in Starlette's
documentation:
The
lifespan
has the concept ofstate
, which is a dictionary that can be used to share the objects between the lifespan, and the requests.The
state
received on the requests is a shallow copy of the state received on the lifespan handler.
Hence, objects added to the state in the lifespan handler can be accessed
inside endpoints using request.state
. The example below uses a streaming
response to both communicate with the external server, as well as send the
response back to the client. See [here](https://www.python-
httpx.org/async/#streaming-responses) for more details on the async
response
streaming methods of httpx
(i.e., aiter_bytes()
, aiter_text()
,
aiter_lines()
, etc.).
If you would like to use a media_type
for the StreamingResponse
, you could
use the one from the original response like this:
media_type=r.headers['content-type']
. However, as described in this
answer, you need to make sure
that the media_type
is not set to text/plain
; otherwise, the content would
not stream as expected in the browser, unless you disable MIME Sniffing
(have a look at the linked answer for more details and solutions).
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
If, for any reason, you need to read the content chunk by chunk on server side before responding back to the client, you could do this as follows:
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()
return StreamingResponse(gen())
If you don't want to use a streaming response, but rather havehttpx
reading the response for you in the first place (which would store the
response data to the server's RAM; hence, you should make sure there is enough
space available to accommodate the data), you could use the following. Note
that using r.json()
should apply only to cases where the response data are
in JSON format; otherwise, you could return a
[PlainTextResponse
](https://fastapi.tiangolo.com/advanced/custom-
response/#plaintextresponse) or a custom
Response
directly, as demonstrated below.
from fastapi import Response
from fastapi.responses import PlainTextResponse
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')
if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)
Using the async
API of httpx
would mean that you have to define your
endpoints with async def
; otherwise, you would have to use the standard
synchronous API (for def
vs async def
see this answer), and as
described in this github
discussion:
Yes.
HTTPX
is intended to be thread-safe, and yes, a single client- instance across all threads will do better in terms of connection pooling, than using an instance-per-thread.
You can also control the connection pool size using the limits
keyword
argument on the Client
(see [Pool limit configuration](https://www.python-
httpx.org/advanced/#pool-limit-configuration)). For example:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)