Transfer Data Between Workers Using Pollable Data Queues
This example shows how to use a pollable data queue to transfer data between workers during asynchronous function evaluations with parfeval
.
You can use PollableDataQueue
objects to transfer data and messages between the client and workers in an interactive parallel pool. By default, a PollableDataQueue
object sends the data only to the client or worker that creates the PollableDataQueue
object. However, starting in R2025a, you can also create a type of PollableDataQueue
object that allows the client or any worker in the pool to poll and receive data.
This example demonstrates how to set up workers to send and receive data from each other using a PollableDataQueue
object. You also use the PollableDataQueue
object to smoothly stop a parfeval
computation on a worker. You can adapt this approach for any application that requires communication between workers during an asynchronous parfeval
computation. To see an example that shows how to use this approach in a data acquisition and processing workflow, see Perform Data Acquisition and Processing on Pool Workers.
Start a parallel pool with at least three thread workers.
pool = parpool("Threads");
Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 6 workers.
To enable communication between workers, create a PollableDataQueue
object with the Destination
argument set to "any"
. This type of PollableDataQueue
object allows any worker to send and receive messages.
queue = parallel.pool.PollableDataQueue(Destination="any");
Define a function for the first worker. The firstWorkerFcn
function processes each element of the input data by doubling it and then sends the result to the next worker through the queue. After processing all data, the function sends a "stop"
signal. If more than one worker is receiving data from the queue, close the queue using the close
function instead of sending multiple "stop"
signals.
function firstWorkerFcn(workerQueue,inData) for idx = 1:numel(inData) initialResult = inData(idx)*2; send(workerQueue,initialResult); end send(workerQueue,"stop"); end
Define a function for the second worker. The secondWorkerFcn
function continuously polls the queue for data, processes each received value, and stores the results. The function stops processing after receiving the "stop"
signal.
function finalResults = secondWorkerFcn(workerQueue) count = 0; while true data = poll(workerQueue,Inf); if strcmp(data,"stop") break; end count = count+1; finalResults(count,:) = [data data+1]; end end
Create input data and use parfeval
to execute the worker functions asynchronously. The first worker processes the input data, and the second worker receives and further processes the results.
inData = 1:5; futures(1) = parfeval(@firstWorkerFcn,0,queue,inData); futures(2) = parfeval(@secondWorkerFcn,1,queue);
Wait for both workers to complete their tasks using wait
, and then retrieve the final results from the second worker using fetchOutputs
.
wait(futures); finalResults = fetchOutputs(futures(2));
Visualize the movement of data through the workers.
plotWorkerOutput(inData,finalResults);
Helper Functions
Define a function to visualize the movement of data through the workers. The plotWorkerOutput
function plots the input data and the results from both workers, showing the transformation at each step.
function plotWorkerOutput(inData,finalResults) c = ["r","g","b","c","m"]; figure; hold on; for idx = 1:numel(inData) plot([1, 2, 3],[inData(idx),finalResults(idx,1),finalResults(idx,2)],"-o"+c(idx),MarkerFaceColor=c(idx),DisplayName="Idx "+num2str(idx)); end hold off; ylabel("Output"); xticklabels(["","Client","Worker 1","Worker 2"]); title("Data Movement Through Workers"); legend grid on; xticks(0:4); xlim([0 4]); end