Send and Recieve data between client and workers

Hello everybody,
I am trying to update text fields of a GUI using multiple workers with parfevalOnAll and parallel.pool.DataQueue. I cannot use labRecieve or labSend because spmd wouldn't work for my application. To recieve the calculated data from the workers and update the text field I am using this code on the client:
parpool(4)
pool = gcp();
fig = uifigure();
grid = uigridlayout(fig, [2, 2]);
btn = uibutton(grid,'Text','SendtoWorker');
textField = uitextarea(uipanel(grid));
Q1 = parallel.pool.DataQueue();
listener1 = afterEach(Q1, @(data) UpdateTextField(textField, data));
fut1 = parfevalOnAll(@worker1, 0, Q1);
btn.ButtonPushedFcn = @(~,~) SendDataToworker(); %Send data to worker on Q2 when button is pushed
function UpdateTextField(textField, data)
str = num2str(data);
textField.Value = str;
end
function SendDataToworker() %Send data1 to worker on Q2
Q2 = parallel.pool.DataQueue();
data1 = 0;
send(Q2,data1)
end
On the workers I have defined the dataQueue Q2 and a listener2 to recieve the variables from client as such:
function worker1()
global acquiredData;
acquiredData = 1; % initial value for the data
Q2 = parallel.pool.DataQueue(); % Queue to recieve data from client
listener2 = afterEach(Q2, @(data) calculateData(data));
function calculateData(data)
acquiredData = data;
send(Q1,acquiredData + 1) %sending calculated value back to client
end
end
Although the Help Section mentions that parallel.pool.DataQueue should work in reverse direction I am not seeing the results on the GUI. I also don't get any errors during execution. Thank you for your help.

5 Commenti

Your function needs to receive the queue object from the main thread. You need to pass it in as a parameter to function worker1 etc. The other thing I am unsure of is the way you want the worker to wait for the data. It may or may not work.
Hi Mohammad. Thank you for your quick response. I have passed the queue object Q1 to worker as such:
function worker1(Q1)
but it didn't help. Could you suggest a way to make the worker recieve the data from the client in this context?
First before you go too deep into this, can I check does your use case really need to use queues ? You can simply call parfeval with different data and use the results as they are returned back.
https://www.mathworks.com/help/parallel-computing/parallel.future.aftereach.html
It does need to use queues because although the code here looks quite simplified I will actually be controlling multiple laboratory power supplies and querying their voltage, current, power values and updating them on the gui whilst sending setParameter() functions from client to worker to be able to control the voltage, kinda like a feedback loop if you will. So the worker actually has to constantly be listening to the queue from client.
Ok after some experimentation this seems to work. If the queue are created in the main thread they can only be used to receive data from the worker but cannot be used to send data. Hence we need to create queues in the main thread and the worker and exchange them to have full bi directional comms.
I have also replaced the nested function to a while loop with an exit condition. You can change the exit condition to suit your purpose. Or perhaps you can send an exit command on q2 e.t.c.
For the main thread
q1 = parallel.pool.DataQueue; % this is to retrieve the data
afterEach(q1,@disp);
% this is used to retrive the pollable data q from the worker
q11 = parallel.pool.PollableDataQueue;
p = gcp;
parfeval(@worker1,0,q1,q11);
q2 = poll(q11,10); % retrieve the pollable data queue from the worker
for i = 1:10
send(q2,i); % use the retrieved q to send data / comms to worker
end
For the worker
function worker1(q1,q11)
nodatacounter = 0;
% create the q and send it back to main thread so that we can use it to rcv
% data from the main thread
q2 = parallel.pool.PollableDataQueue;
send(q11,q2);
while nodatacounter < 10 % kill the worker if no comms for 10 iterations
[data,datarcvd] = poll(q2,10); % 10 second timeout
if datarcvd
send(q1,data);
nodatacounter = 0;
else
nodatacounter = nodatacounter + 1;
end
end
end

Accedi per commentare.

 Risposta accettata

Moved to answer.
Ok after some experimentation this seems to work. If the queue are created in the main thread they can only be used to receive data from the worker but cannot be used to send data. Hence we need to create queues in the main thread and the worker and exchange them to have full bi directional comms.
I have also replaced the nested function to a while loop with an exit condition. You can change the exit condition to suit your purpose. Or perhaps you can send an exit command on q2 e.t.c.
For the main thread
q1 = parallel.pool.DataQueue; % this is to retrieve the data
afterEach(q1,@disp);
% this is used to retrive the pollable data q from the worker
q11 = parallel.pool.PollableDataQueue;
p = gcp;
parfeval(@worker1,0,q1,q11);
q2 = poll(q11,10); % retrieve the pollable data queue from the worker
for i = 1:10
send(q2,i); % use the retrieved q to send data / comms to worker
end
For the worker
function worker1(q1,q11)
nodatacounter = 0;
% create the q and send it back to main thread so that we can use it to rcv
% data from the main thread
q2 = parallel.pool.PollableDataQueue;
send(q11,q2);
while nodatacounter < 10 % kill the worker if no comms for 10 iterations
[data,datarcvd] = poll(q2,10); % 10 second timeout
if datarcvd
send(q1,data);
nodatacounter = 0;
else
nodatacounter = nodatacounter + 1;
end
end
end

2 Commenti

Mohammed thank you very much, it worked hassle-free!
From R2025a you can use the new any-destination PollableDataQueue to simplify this soulution. The any-destination queue can be created on any client/worker, and any client/worker can use it to send or poll. You can therefore avoid the extra step of having to create the queue on the worker and send it back to the client. You create a any-destination queue like this:
queue = parallel.pool.PollableDataQueue(Destination="any")

Accedi per commentare.

Più risposte (0)

Prodotti

Release

R2019b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by