Question
I have the following code:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
If I run my code on localhost - e.g., http://localhost:8501/ping
- in
different tabs of the same browser window, I get:
Hello
bye
Hello
bye
instead of:
Hello
Hello
bye
bye
I have read about using httpx
, but still, I cannot have a true
parallelization. What's the problem?
Answer
As per [FastAPI's documentation](https://fastapi.tiangolo.com/async/#path- operation-functions):
When you declare a path operation function with normal
def
instead ofasync def
, it is run in an external threadpool that is thenawait
ed, instead of being called directly (as it would block the server).
also, as described [here](https://fastapi.tiangolo.com/async/#concurrency-and- async-await):
If you are using a third party library that communicates with something (a database, an API, the file system, etc.) and doesn't have support for using
await
, (this is currently the case for most database libraries), then declare your path operation functions as normally, with justdef
.If your application (somehow) doesn't have to communicate with anything else and wait for it to respond, use
async def
.If you just don't know, use normal
def
.Note : You can mix
def
andasync def
in your path operation functions as much as you need and define each one using the best option for you. FastAPI will do the right thing with them.Anyway, in any of the cases above, FastAPI will still work asynchronously and be extremely fast.
But by following the steps above, it will be able to do some performance optimizations.
Thus, def
endpoints (in the context of asynchronous programming, a function
defined with just def
is called synchronous function), in FastAPI, run in
a separate thread from an external threadpool that is then await
ed, and
hence, FastAPI will still work asynchronously. In other words, the server
will process requests to such endpoints concurrently. Whereas, async def
endpoints run in the [event loop
](https://docs.python.org/3/library/asyncio-
eventloop.html)—on the main (single) thread—that is, the server will also
process requests to such endpoints concurrently / asynchronously , as
long as there is an
[await
](https://stackoverflow.com/questions/38865050/is-await-in-
python3-cooperative-multitasking) call to non-blocking I/O-bound operations
inside such async def
endpoints/routes, such as waiting for (1) data from
the client to be sent through the network, (2) contents of a file in the disk
to be read, (3) a database operation to finish, etc., (have a look
here). If, however,
an endpoint defined with async def
does not await
for something inside, in
order to give up time for other tasks in the event loop
to run (e.g.,
requests to the same or other endpoints, background tasks, etc.), each request
to such an endpoint will have to be completely finished (i.e., exit the
endpoint), before returning control back to the event loop
and allow other
tasks to run. In other words, in such cases, the server will process requests
sequentially. Note that the same concept not only applies to FastAPI
endpoints, but also to StreamingResponse
's generator
function (see
StreamingResponse
class implementation), as well as Background Tasks
(see
BackgroundTask
class implementation); hence, after reading this answer to the end, you should
be able to decide whether you should define a FastAPI endpoint,
StreamingResponse
's generator, or background task function with def
or
async def
.
The keyword await
(which works only within an async def
function) passes
function control back to the event loop
. In other words, it suspends the
execution of the surrounding
coroutine
(i.e., a coroutine object is the result of calling an async def
function),
and tells the event loop
to let something else run, until that await
ed
task completes. Note that just because you may define a custom function
with async def
and then await
it inside your async def
endpoint, it
doesn't mean that your code will work asynchronously, if that custom function
contains, for example, calls to time.sleep()
, CPU-bound tasks, non-async I/O
libraries, or any other blocking call that is incompatible with asynchronous
Python code. In FastAPI, for example, when using the async
methods of
[UploadFile
](https://fastapi.tiangolo.com/tutorial/request-
files/#uploadfile), such as await file.read()
and await file.write()
,
FastAPI/Starlette, behind the scenes, actually runs such [methods of File
objects](https://docs.python.org/3/tutorial/inputoutput.html#methods-of-file-
objects) in an external threadpool (using the async
run_in_threadpool()
function) and await
s it; otherwise, such methods/operations would block the
event loop
. You can find out more by having a look at the implementation of
the UploadFile
class.
Note that async
does not mean parallel , but concurrently.
Asynchronous code with async
and await
is many times summarised as using
coroutines. Coroutines
are collaborative (or cooperatively
multitasked), meaning
that "at any given time, a program with coroutines is running only one of
its coroutines, and this running coroutine suspends its execution only when it
explicitly requests to be suspended" (see
here and
[here](https://stackoverflow.com/questions/1934715/difference-between-a-
coroutine-and-a-thread) for more info on coroutines). As described in this
article:
Specifically, whenever execution of a currently-running coroutine reaches an
await
expression, the coroutine may be suspended, and another previously- suspended coroutine may resume execution if what it was suspended on has since returned a value. Suspension can also happen when anasync for
block requests the next value from an asynchronous iterator or when anasync with
block is entered or exited, as these operations useawait
under the hood.
If, however, a blocking I/O-bound or CPU-bound operation was directly
executed/called inside an async def
function/endpoint, it would block the
main thread , and hence, the event loop
(as the event loop
runs in the
main thread). Hence, a blocking operation such as time.sleep()
in an async def
endpoint would block the entire server (as in the code example provided
in your question). Thus, if your endpoint is not going to make any async
calls, you could declare it with just def
instead, which would be run in an
external threadpool that would then be await
ed, as explained earlier (more
solutions are given in the following sections). Example:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
Otherwise, if the functions that you had to execute inside the endpoint are
async
functions that you had to await
, you should define your endpoint
with async def
. To demonstrate this, the example below uses the
[asyncio.sleep()
](https://docs.python.org/3/library/asyncio-
task.html#asyncio.sleep) function (from the
asyncio
library), which
provides a non-blocking sleep operation. The await asyncio.sleep()
method
will suspend the execution of the surrounding coroutine (until the sleep
operation completes), thus allowing other tasks in the event loop
to run.
Similar examples are given [here](https://docs.python.org/3/library/asyncio-
task.html#coroutine) and here as well.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Both the endpoints above will print out the specified messages to the screen in the same order as mentioned in your question—if two requests arrived at around the same time—that is:
Hello
Hello
bye
bye
Important Note
When you call your endpoint for the second (third, and so on) time, please
remember to do that from a tab that is isolated from the browser's main
session ; otherwise, succeeding requests (i.e., coming after the first one)
will be blocked by the browser (on client side ), as the browser will be
waiting for response from the server for the previous request before sending
the next one. You can confirm that by using print(request.client)
inside the
endpoint, where you would see the hostname
and port
number being the same
for all incoming requests—if requests were initiated from tabs opened in the
same browser window/session)—and hence, those requests would be processed
sequentially, because of the browser sending them sequentially in the first
place. To solve this, you could either:
-
Reload the same tab (as is running), or
-
Open a new tab in an Incognito Window, or
-
Use a different browser/client to send the request, or
-
Use the
httpx
library to make asynchronous HTTP requests, along with the awaitableasyncio.gather()
, which allows executing multiple asynchronous operations concurrently and then returns a list of results in the same order the awaitables (tasks) were passed to that function (have a look at this answer for more details).
Example :
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
In case you had to call different endpoints that may take different time to
process a request, and you would like to print the response out on client side
as soon as it is returned from the server—instead of waiting for
asyncio.gather()
to gather the results of all tasks and print them out in
the same order the tasks were passed to the send()
function—you could
replace the send()
function of the example above with the one shown below:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
and Blocking I/O-bound or CPU-bound Operations
If you are required to use async def
(as you might need to await
for
coroutines inside your endpoint), but also have some synchronous blocking
I/O-bound or CPU-bound operation (long-running computation task) that will
block the event loop
(essentially, the entire server) and won't let other
requests to go through, for example:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this will block the event loop
finally:
await file.close()
print("bye")
return "pong"
then:
-
You should check whether you could change your endpoint's definition to normal
def
instead ofasync def
. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you could instead declare the type of the endpoint's parameter asbytes
(i.e.,file: bytes = File()
) and thus, FastAPI would read the file for you and you would receive the contents asbytes
. Hence, there would be no need to useawait file.read()
. Please note that the above approach should work for small files, as the enitre file contents would be stored into memory (see the documentation onFile
Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the.read()
method of theSpooledTemporaryFile
directly (which can be accessed through the.file
attribute of theUploadFile
object), so that again you don't have toawait
the.read()
method—and as you can now declare your endpoint with normaldef
, each request will run in a separate thread (example is given below). For more details on how to upload aFile
, as well how Starlette/FastAPI usesSpooledTemporaryFile
behind the scenes, please have a look at this answer and this answer.@app.post("/ping") def ping(file: UploadFile = File(...)): print("Hello") try: contents = file.file.read() res = cpu_bound_task(contents) finally: file.file.close() print("bye") return "pong"
-
Use FastAPI's (Starlette's)
run_in_threadpool()
function from theconcurrency
module—as @tiangolo suggested here—which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool
is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
-
Alternatively, use
asyncio
'sloop.run_in_executor()
—after obtaining the runningevent loop
usingasyncio.get_running_loop()
—to run the task, which, in this case, you canawait
for it to complete and return the result(s), before moving on to the next line of code. PassingNone
as the executor argument, the default executor will be used; that isThreadPoolExecutor
:import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
or, if you would like to [pass keyword
arguments](https://docs.python.org/3/library/asyncio-eventloop.html#asyncio-
pass-keywords) instead, you could use a lambda
expression (e.g., lambda: cpu_bound_task(some_arg=contents)
), or, preferably,
functools.partial()
,
which is specifically recommended in the documentation for
[loop.run_in_executor()
](https://docs.python.org/3/library/asyncio-
eventloop.html#asyncio.loop.run_in_executor):
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
You could also run your task in a custom
ThreadPoolExecutor
.
For instance:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
In Python 3.9+, you could also use
[asyncio.to_thread()
](https://docs.python.org/3/library/asyncio-
task.html#asyncio.to_thread) to asynchronously run a synchronous function in a
separate thread—which, essentially, uses await loop.run_in_executor(None, func_call)
under the hood, as can been seen in the implementation of
asyncio.to_thread()
.
The to_thread()
function takes the name of a blocking function to execute,
as well as any arguments (*args and/or **kwargs) to the function, and then
returns a coroutine that can be await
ed. Example:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
-
ThreadPoolExecutor
will successfully prevent theevent loop
from being blocked, but won't give you the performance improvement you would expect from running code in parallel ; especially, when one needs to performCPU-bound
operations, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process —usingProcessPoolExecutor
, as shown below—which, again, you can integrate withasyncio
, in order toawait
it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be underif __name__ == '__main__':
.import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
- Use moreworkers to take advantage of multi-core CPUs, in order to run multiple processes in parallel and be able to serve more requests. For example,
uvicorn main:app --workers 4
(if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). When using 1 worker, only one process is run. When using multiple workers, this will spawn multiple processes (all single threaded). Each process has a separate Global Interpreter Lock (GIL), as well as its ownevent loop
, which runs in the main thread of each process and executes all tasks in its thread. That means, there is only one thread that can take a lock on the interpreter of each process; unless, of course, you employ additional threads, either outside or inside theevent loop
, e.g., when using aThreadPoolExecutor
withloop.run_in_executor
, or defining endpoints/background tasks/StreamingResponse
's generator with normaldef
instead ofasync def
, as well as when callingUploadFile
's methods (see the first two paragraphs of this answer for more details).
Note: Each worker ["has its own things, variables and
memory"](https://fastapi.tiangolo.com/deployment/concepts/#memory-per-
process). This means that global
variables/objects, etc., won't be shared
across the processes/workers. In this case, you should consider using a
database storage, or Key-Value stores (Caches), as described
here and
here. Additionally, note that
"if you are consuming a large amount of memory in your code, each process
will consume an equivalent amount of memory".
- If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.