From a library, I get a function that reads a file and returns a numpy array.
I want to build a Dask array with multiple blocks from multiple files.
Each block is the result of calling the function on a file.
When I ask Dask to compute, will Dask asks the functions to read multiple files from the hard disk at the same time?
If that is the case, how to avoid that? My computer doesn't have a parallel file system.
Example:
import numpy as np
import dask.array as da
import dask
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy', x)
np.save('1.npy', x)
# np.load is a function that reads a file
# and returns a numpy array.
# Build delayed
y = [dask.delayed(np.load)('%d.npy' % i)
for i in range(2)]
# Build individual Dask arrays.
# I can get the shape of each numpy array without
# reading the whole file.
z = [da.from_delayed(a, (n, m), np.int) for a in y]
# Combine the dask arrays
w = da.vstack(z)
print(w.compute())
Answer
Yes, when you use Dask in the example you provided, Dask will attempt to parallelize the reading of files, meaning that it could try to read multiple files simultaneously if you have multiple tasks, as Dask is designed to execute tasks in parallel. This is true unless you explicitly control the task execution or limit the number of workers in Dask.
Since your computer doesn't have a parallel file system, reading multiple files simultaneously could lead to performance degradation or errors, especially if your system can only handle one file read at a time (for example, due to disk bandwidth limitations).
How to avoid parallel file reads in Dask:
-
Control the number of workers: You can limit the number of workers Dask uses so that it only reads one file at a time. You can do this by controlling the Dask client, particularly the
n_workers
parameter or by using a DaskLocalCluster
with a fixed number of workers. -
Use a controlled task graph: You can explicitly control the task execution flow in Dask by introducing dependencies between the file reads. This way, you can ensure that only one file is read at a time.
Option 1: Limit the number of workers
By default, Dask may start multiple workers, which can lead to concurrent file reads. You can configure Dask to use only one worker in this case. Here's how you can do it:
import dask
import numpy as np
import dask.array as da
from dask.distributed import Client
# Limit to a single worker in Dask
client = Client(n_workers=1)
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy', x)
np.save('1.npy', x)
# np.load is a function that reads a file and returns a numpy array.
y = [dask.delayed(np.load)('%d.npy' % i) for i in range(2)]
# Build individual Dask arrays
z = [da.from_delayed(a, (n, m), np.int) for a in y]
# Combine the Dask arrays
w = da.vstack(z)
print(w.compute()) # Will read files sequentially with one worker
This ensures that Dask uses only a single worker for all tasks, effectively preventing concurrent file reads. However, this may reduce parallelism, so it should be used carefully if performance is a concern.
Option 2: Control task dependencies
You can explicitly control the task graph by creating dependencies between the file reads. This approach can be helpful if you want to ensure that only one file is read at a time and that the tasks are executed in a sequential order.
import numpy as np
import dask.array as da
import dask
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy', x)
np.save('1.npy', x)
# Create a delayed task for the first file
y0 = dask.delayed(np.load)('0.npy')
# Chain the second read to the first task so they run sequentially
y1 = dask.delayed(np.load)('1.npy')
# Create Dask arrays from delayed results
z0 = da.from_delayed(y0, (n, m), np.int)
z1 = da.from_delayed(y1, (n, m), np.int)
# Combine the Dask arrays
w = da.vstack([z0, z1])
# Compute the result
print(w.compute()) # Files will be read one after the other
In this example, the file 1.npy
will only be read after 0.npy
is read, ensuring sequential access.
Option 3: Use dask.delayed
with a single thread
Another way to ensure sequential file reads is to use Dask's delayed
execution with a single-threaded scheduler:
import dask
import numpy as np
import dask.array as da
# Make test data
n = 2
m = 3
x = np.arange(n * m, dtype=np.int).reshape(n, m)
np.save('0.npy', x)
np.save('1.npy', x)
# np.load is a function that reads a file and returns a numpy array.
y = [dask.delayed(np.load)('%d.npy' % i) for i in range(2)]
# Build individual Dask arrays
z = [da.from_delayed(a, (n, m), np.int) for a in y]
# Combine the Dask arrays
w = da.vstack(z)
# Use Dask's single-threaded scheduler to ensure sequential execution
result = w.compute(scheduler='single-threaded')
print(result) # Files are read sequentially
This tells Dask to use a single thread for execution, preventing parallelism and file read conflicts.
Option 4: Sequential task execution with manual threading control
If you have more complex task dependencies, or you want more control over the threading behavior, you can create your own task graph and control execution flow manually. However, this is generally more complex and usually not required unless your task graph is particularly intricate.
Conclusion
To avoid Dask reading multiple files concurrently, you can:
- Limit the number of workers using the
Client
(e.g.,n_workers=1
). - Use explicit task dependencies to ensure sequential execution.
- Use a single-threaded scheduler if you want to disable parallelism entirely for the computation.
Each of these options should help you ensure that Dask doesn't attempt to read multiple files in parallel, which could be problematic on systems without a parallel file system.