FastAPI python: How to run a thread in the background?

ghz 1years ago ⋅ 6572 views

Question

I'm making a server in python using FastAPI, and I want a function that is not related to my API, to run in background every 5 minutes (like checking stuff from an API and printing stuff depending on the response)

I've tried to make a thread that runs the function start_worker, but it doesn't print anything.

Does anyone know how to do so ?

def start_worker():
    print('[main]: starting worker...')
    my_worker = worker.Worker()
    my_worker.working_loop() # this function prints "hello" every 5 seconds

if __name__ == '__main__':
    print('[main]: starting...')
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()

Answer

Option 1

You should start your Thread before calling uvicorn.run, as uvicorn.run is blocking the thread.

import time
import threading
from fastapi import FastAPI
import uvicorn

app = FastAPI()
class BackgroundTasks(threading.Thread):
    def run(self,*args,**kwargs):
        while True:
            print('Hello')
            time.sleep(5)
  
if __name__ == '__main__':
    t = BackgroundTasks()
    t.start()
    uvicorn.run(app, host="0.0.0.0", port=8000)

You could also start your thread using FastAPI's startup event, as long as it is ok to run before the application starts.

@app.on_event("startup")
async def startup_event():
    t = BackgroundTasks()
    t.start()

Option 2

You could instead use a repeating Event scheduler for the background task, as below:

import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn

app = FastAPI()
s = sched.scheduler(time.time, time.sleep)

def print_event(sc): 
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target = start_scheduler)
    thread.start()

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

Option 3

If your task is an async def function (see this answer for more details on def vs async def endpoints/background tasks in FastAPI), then you could add the task to the current event loop, using the [asyncio.create_task()](https://docs.python.org/3/library/asyncio- task.html#asyncio.create_task) function. The create_task() function takes a coroutine object (i.e., an async def function) and returns a Task object (which can be used to await the task, if needed, or cancel it , etc). The call creates the task inside the event loop for the current thread, and executes it in the "background" concurrently with all other tasks in the event loop, switching between them at await points.

It is required to have an event loop created before calling create_task(), and this is already created when starting the uvicorn server either programmatically (using, for instance, uvicorn.run(app)) or in the terminal (using, for instance, uvicorn app:app). Instead of using asyncio.create_task(), one could also use [asyncio.get_running_loop()](https://docs.python.org/3/library/asyncio- eventloop.html#asyncio.get_running_loop) to get the current event loop, and then call [loop.create_task()](https://docs.python.org/3/library/asyncio- eventloop.html#asyncio.loop.create_task).

The example below uses the recently documented way for adding lifespan events (using a context manager), i.e., code that should be executed before the application starts up, as well as when the application is shutting down (see the documentation, as well as this answer for more details and examples). One could also still use the startup and shutdown events, as demonstrated in the previous options; however, those event handlers might be removed from future FastAPI/Starlette versions.

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio


async def print_task(s): 
    while True:
        print('Hello')
        await asyncio.sleep(s)

        
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Run at startup
    asyncio.create_task(print_task(5))
    yield
    # Run on shutdown (if required)
    print('Shutting down...')


app = FastAPI(lifespan=lifespan)