How to stream DataFrame using FastAPI without saving the data to csv file?

ghz 1years ago ⋅ 2496 views

Question

I would like to know how to stream a DataFrame using FastAPI without having to save the DataFrame to a csv file on disk. Currently, what I managed to do is to stream data from the csv file, but the speed was not very fast compared to returning a FileResponse. The /option7 below is what I'm trying to do.

My goal is to stream data from FastAPI backend without saving the DataFrame to a csv file.

Thank you.

from fastapi import FastAPI, Response,Query
from fastapi.responses import FileResponse,HTMLResponse,StreamingResponse
app = FastAPI()

df = pd.read_csv("data.csv")

@app.get("/option4")
def load_questions():
    return FileResponse(path="C:Downloads/data.csv", filename="data.csv")

@app.get("/option5")
def load_questions():
    def iterfile():  # 
        with open('data.csv', mode="rb") as file_like:  # 
            yield from file_like  # 

    return StreamingResponse(iterfile(), media_type="text/csv")

@app.get("/option7")
def load_questions():
    def iterfile():  # 
        #with open(df, mode="rb") as file_like:  # 
        yield from df  # 

    return StreamingResponse(iterfile(), media_type="application/json")

Answer

Approach 1 (recommended)

As mentioned in this answer, as well as here and here, when the entire data (a DataFrame in your case) is already loaded into memory, there is no need to use [StreamingResponse](https://fastapi.tiangolo.com/advanced/custom- response/#streamingresponse). StreamingResponse makes sense when you want to transfer real-time data and when you don't know the size of your output ahead of time, and you don't want to wait to collect it all to find out before you start sending it to the client, as well as when a file that you would like to return is too large to fit into memory—for instance, if you have 8GB of RAM, you can't load a 50GB file—and hence, you would rather load the file into memory in chunks.

In your case, as the DataFrame is already loaded into memory, you should instead return a custom Response directly, after using .to_json() method to convert the DataFrame into a JSON string, as described in this answer (see related posts here and here as well). Example:

from fastapi import Response

@app.get("/")
def main():
    return Response(df.to_json(orient="records"), media_type="application/json")

If you find the browser taking a while to display the data, you may want to have the data downloaded as a .json file to the user's device (which would be completed much faster), rather than waiting for the browser to display a large amount of data. You can do that by setting the [Content- Disposition](https://developer.mozilla.org/en- US/docs/Web/HTTP/Headers/Content-Disposition) header in the Response using the attachment parameter (see this answer for more details):

@app.get("/")
def main():
    headers = {'Content-Disposition': 'attachment; filename="data.json"'}
    return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')

You could also return the data as a .csv file, using the .to_csv() method without specifying the path parameter. Since using return df.to_csv() would result in displaying the data in the browser with \r\n characters included, you might find it better to put the csv data in a Response instead, and specify the Content-Disposition header, so that the data will be downloaded as a .csv file. Example:

@app.get("/")
def main():
    headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
    return Response(df.to_csv(), headers=headers, media_type="text/csv")

Approach 2

To use a [StreamingResponse](https://fastapi.tiangolo.com/advanced/custom- response/#streamingresponse), you would need to iterate over the rows in a DataFrame, convert each row into a dictionary and subsequently into a JSON string, using either the standard json library, or other faster JSON encoders, as described in this answer (the JSON string will be later encoded into byte format internally by FastAPI/Starlette, as shown in the source code here). Example:

@app.get("/")
def main():
    def iter_df():
        for _, row in df.iterrows():
            yield json.dumps(row.to_dict()) + '\n'

    return StreamingResponse(iter_df(), media_type="application/json")

Iterating through Pandas objects is generally slow and not recommended. As described in this answer:

Iteration in Pandas is an anti-pattern and is something you should only do when you have exhausted every other option. You should not use any function with "iter" in its name for more than a few thousand rows or you will have to get used to a lot of waiting.

Update

As @Panagiotis Kanavos noted in the comments section below, using either .to_json() or .to_csv() on the DataFrame that is already loaded into memory, would result in allocating the entire output string in memory, thus doubling the RAM usage or even worse. Hence, in the case of having such a huge amount of data that may cause your system to slow down or crash (because of running out of memory) if used either method above, you should rather use StreamingResponse, as described earlier. You may find faster alernative methods to iterrows() in [this post](https://stackoverflow.com/questions/16476924/how-to-iterate-over-rows- in-a-dataframe-in-pandas), as well as faster JSON encoders, such as orjson and ujson, as described in this answer and this answer.

Alternatively , you could save the data to disk, then delete the DataFrame to release the memory—you could even manually trigger the garbage collection using gc.collect(), as shown in this answer; however, frequent calls to garbage collection is discouraged, as it is a costly operation and may affect performance—and return a FileResponse (assuming the data can fit into RAM; otherwise , you should use StreamingResponse, see this answer, as well as this answer), and finally, have a BackgroundTask to delete the file from disk after returning the response. Example is given below:

from fastapi import BackgroundTasks
from fastapi.responses import FileResponse 
import uuid
import os

@app.get("/")
def main(background_tasks: BackgroundTasks):
    filename = str(uuid.uuid4()) + ".csv"
    df.to_csv(filename)
    del df  # release the memory
    background_tasks.add_task(os.remove, filename) 
    return FileResponse(filename, filename="data.csv", media_type="text/csv")
    # or return StreamingResponse, if the file can't fit into RAM; see linked answers above

The solution you may choose should be based on your application's requirements (e.g., the number of users you expect to serve simultaneously, the size of data, the response time, etc.), as well as your system's specifications (e.g., avaialable memory for allocation). Additionally, since all calls to DataFrame's methods are synchronous, you should remember to define your endpoint with a normal def, so that it is run in an external threadpool; otherwise, it would block the server. Alternatively, you could use Starlette's run_in_threadpool() from the concurrency module, which will run the to_csv() or to_json() function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked. Please have a look at this answer for more details on def vs async def, as well as solutions when one has to run synchronous blocking operations inside async def endpoints.