Main Content

Transfer Data Between Workers Using Pollable Data Queues

Since R2025a

This example shows how to use a pollable data queue to transfer data between workers during asynchronous function evaluations with parfeval.

You can use PollableDataQueue objects to transfer data and messages between the client and workers in an interactive parallel pool. By default, a PollableDataQueue object sends the data only to the client or worker that creates the PollableDataQueue object. However, starting in R2025a, you can also create a type of PollableDataQueue object that allows the client or any worker in the pool to poll and receive data.

This example demonstrates how to set up workers to send and receive data from each other using a PollableDataQueue object. You also use the PollableDataQueue object to smoothly stop a parfeval computation on a worker. You can adapt this approach for any application that requires communication between workers during an asynchronous parfeval computation. To see an example that shows how to use this approach in a data acquisition and processing workflow, see Perform Data Acquisition and Processing on Pool Workers.

Start a parallel pool with at least three thread workers.

pool = parpool("Threads");
Starting parallel pool (parpool) using the 'Threads' profile ...
Connected to parallel pool with 6 workers.

To enable communication between workers, create a PollableDataQueue object with the Destination argument set to "any". This type of PollableDataQueue object allows any worker to send and receive messages.

queue = parallel.pool.PollableDataQueue(Destination="any");

Define a function for the first worker. The firstWorkerFcn function processes each element of the input data by doubling it and then sends the result to the next worker through the queue. After processing all data, the function sends a "stop" signal. If more than one worker is receiving data from the queue, close the queue using the close function instead of sending multiple "stop" signals.

function firstWorkerFcn(workerQueue,inData)
for idx = 1:numel(inData)
    initialResult = inData(idx)*2;
    send(workerQueue,initialResult);
end
send(workerQueue,"stop");
end

Define a function for the second worker. The secondWorkerFcn function continuously polls the queue for data, processes each received value, and stores the results. The function stops processing after receiving the "stop" signal.

function finalResults = secondWorkerFcn(workerQueue)
count = 0;
while true
    data = poll(workerQueue,Inf);
    if strcmp(data,"stop")
        break;
    end
    count = count+1;
    finalResults(count,:) = [data data+1];
end
end

Create input data and use parfeval to execute the worker functions asynchronously. The first worker processes the input data, and the second worker receives and further processes the results.

inData = 1:5;
futures(1) = parfeval(@firstWorkerFcn,0,queue,inData);
futures(2) = parfeval(@secondWorkerFcn,1,queue);

Wait for both workers to complete their tasks using wait, and then retrieve the final results from the second worker using fetchOutputs.

wait(futures);
finalResults = fetchOutputs(futures(2));

Visualize the movement of data through the workers.

plotWorkerOutput(inData,finalResults);

Helper Functions

Define a function to visualize the movement of data through the workers. The plotWorkerOutput function plots the input data and the results from both workers, showing the transformation at each step.

function plotWorkerOutput(inData,finalResults)
c = ["r","g","b","c","m"];
figure;
hold on;
for idx = 1:numel(inData)
    plot([1, 2, 3],[inData(idx),finalResults(idx,1),finalResults(idx,2)],"-o"+c(idx),MarkerFaceColor=c(idx),DisplayName="Idx "+num2str(idx));
end
hold off;
ylabel("Output");
xticklabels(["","Client","Worker 1","Worker 2"]);
title("Data Movement Through Workers");
legend
grid on;
xticks(0:4);
xlim([0 4]);
end

See Also

Functions

Objects

Topics