Colleagues, who can help, my stack: Python 3.10, arq 0.25, redis. why is the background task arq not running? A key like this appears in redis "arq:job:32646989261645a4a164815cc34dfee5", no errors when requesting a route (return 202)
from arq import create_pool
from arq.connections import RedisSettings
@app.get("/api/integration/test")
async def test(current_user: Annotated [GetUsersDTO, Depends(get_current_user)], response: Response):
pool = await create_pool(RedisSettings(host=host, password=password))
async with pool as pool:
await pool.enqueue_job("task", current_user[0].id)
response.status_code = status.HTTP_202_ACCEPTED
return {"message": "start..."}
async def task(ctx, user_id):
log_id, err = await insert_log({'users_id': user_id, 'work': 'start'})
for i in range(30):
print(i)
time.sleep(5)
Answers
It seems like you're using FastAPI to define your API endpoints and Arq for background task processing with Redis. However, there are some issues in your code that might prevent Arq from running background tasks properly. Let's address them:
-
Pool Creation: You're creating a new Redis pool (
create_pool
) for every request to the/api/integration/test
route. This is inefficient and might lead to unexpected behavior. Instead, you should create the Redis pool once when your application starts and reuse it for all requests. -
Async Issue: The
time.sleep(5)
call in yourtask
function is blocking and not asynchronous. This can potentially block the event loop and prevent other tasks from running. You should useawait asyncio.sleep(5)
instead to properly suspend the task without blocking the event loop. -
Arq Worker: You need an Arq worker to process the background tasks enqueued in Redis. Make sure you have a separate process running the Arq worker to execute the
task
function.
Here's how you can fix these issues:
from fastapi import FastAPI, Response
from arq import create_pool
from arq.connections import RedisSettings
import asyncio
app = FastAPI()
# Create the Redis pool once when the application starts
redis_settings = RedisSettings(host=host, password=password)
pool = create_pool(redis_settings)
@app.on_event("startup")
async def startup_event():
await pool.start()
@app.on_event("shutdown")
async def shutdown_event():
await pool.close()
@app.get("/api/integration/test")
async def test(current_user: GetUsersDTO, response: Response):
# Enqueue the background task without creating a new pool
await pool.enqueue_job("task", current_user[0].id)
response.status_code = status.HTTP_202_ACCEPTED
return {"message": "Task started..."}
async def task(ctx, user_id):
log_id, err = await insert_log({'users_id': user_id, 'work': 'start'})
for i in range(30):
print(i)
await asyncio.sleep(5) # Use asyncio.sleep instead of time.sleep
Make sure you have an Arq worker running in a separate process to execute the background tasks. You can start the worker using the arq worker
command in your terminal.
With these changes, your background tasks enqueued with Arq should run properly.