Background task does not start in FastApi, Arq

ghz 9months ago ⋅ 79 views

Colleagues, who can help, my stack: Python 3.10, arq 0.25, redis. why is the background task arq not running? A key like this appears in redis "arq:job:32646989261645a4a164815cc34dfee5", no errors when requesting a route (return 202)

from arq import create_pool
from arq.connections import RedisSettings

@app.get("/api/integration/test")
async def test(current_user: Annotated [GetUsersDTO, Depends(get_current_user)], response: Response):
    pool = await create_pool(RedisSettings(host=host, password=password))
    async with pool as pool:
        await pool.enqueue_job("task", current_user[0].id)
    response.status_code = status.HTTP_202_ACCEPTED
    return {"message": "start..."}

async def task(ctx, user_id):
    log_id, err = await insert_log({'users_id': user_id, 'work': 'start'})
    for i in range(30):
        print(i)
        time.sleep(5)

Answers

It seems like you're using FastAPI to define your API endpoints and Arq for background task processing with Redis. However, there are some issues in your code that might prevent Arq from running background tasks properly. Let's address them:

  1. Pool Creation: You're creating a new Redis pool (create_pool) for every request to the /api/integration/test route. This is inefficient and might lead to unexpected behavior. Instead, you should create the Redis pool once when your application starts and reuse it for all requests.

  2. Async Issue: The time.sleep(5) call in your task function is blocking and not asynchronous. This can potentially block the event loop and prevent other tasks from running. You should use await asyncio.sleep(5) instead to properly suspend the task without blocking the event loop.

  3. Arq Worker: You need an Arq worker to process the background tasks enqueued in Redis. Make sure you have a separate process running the Arq worker to execute the task function.

Here's how you can fix these issues:

from fastapi import FastAPI, Response
from arq import create_pool
from arq.connections import RedisSettings
import asyncio

app = FastAPI()

# Create the Redis pool once when the application starts
redis_settings = RedisSettings(host=host, password=password)
pool = create_pool(redis_settings)

@app.on_event("startup")
async def startup_event():
    await pool.start()

@app.on_event("shutdown")
async def shutdown_event():
    await pool.close()

@app.get("/api/integration/test")
async def test(current_user: GetUsersDTO, response: Response):
    # Enqueue the background task without creating a new pool
    await pool.enqueue_job("task", current_user[0].id)
    response.status_code = status.HTTP_202_ACCEPTED
    return {"message": "Task started..."}

async def task(ctx, user_id):
    log_id, err = await insert_log({'users_id': user_id, 'work': 'start'})
    for i in range(30):
        print(i)
        await asyncio.sleep(5)  # Use asyncio.sleep instead of time.sleep

Make sure you have an Arq worker running in a separate process to execute the background tasks. You can start the worker using the arq worker command in your terminal.

With these changes, your background tasks enqueued with Arq should run properly.