PollableDataQueues / competing while loops
    7 visualizzazioni (ultimi 30 giorni)
  
       Mostra commenti meno recenti
    
I want to establish two-way communications between a client and worker called via parfeval within parallel while loops. The client sends instruction updates to the worker and the worker needs to send data packages and status messages to the client.
The problem with this setup is the competing while loops: if one of the while-loops is faster, the data queue being written to by the faster while-loop gets excessively long. I would prefer if the last item could be pulled and the queue flushed, but the best I can come up with at the moment is to implement a while loop that sucks everything out of the queue. The extra polling adds overhead, especially if one of the queues is much slower than the other.
The example below demonstrates this: the client is slowed down by the plot operation. The code executes slower than it otherwise would because of the while-loop emptying the queue with its polling. It gets much worse if the pause command is uncommented. If you look at the number of messages in PolQ after execution you will still see a large number of leftover messages (~50 on my machine).
% Start parallel pool...
if isempty(gcp('nocreate'))
    p = parpool;
else
    p = gcp;
end
% Struct containing instructions (Kill / exit)
Message.Kill    = false; 
% Pollable queue for receiving data from worker
PolQ            = parallel.pool.PollableDataQueue;
% Initialize worker
f               = parfeval(@MyWorkerFunc, 0, Message, PolQ);
% Receive a queue, pollable by the worker, for sending messages to the worker:
SenQ            = poll(PolQ, Inf);
% Initialize client main loop...
killCount   = 1;
killThresh  = 100;
while true    
    % Poll worker for data...
    if PolQ.QueueLength > 0
        while PolQ.QueueLength > 0
            [data, OK] = poll(PolQ);
        end
        % Do something useful that takes time
        plot(data);
        drawnow;
        % Uncomment below to see an exaggerated effect...
        % pause(0.05);
    end
    % Kill?
    if killCount > killThresh
        Message.Kill = true;
        send(SenQ, Message);
        break
    end    
    % Send "instructions" (not present in demo) to worker
    send(SenQ, Message);
    % Increment kill counter
    killCount = killCount + 1;
    disp(killCount);  
end
disp(PolQ)
% Function executed on worker:
function MyWorkerFunc(Message, PolQ)
    % Create instruction relay & send back to client:
    SenQ = parallel.pool.PollableDataQueue; 
    send(PolQ, SenQ);
    while ~Message.Kill
        % Make our "data"
        data = randn(1, 100);
        % Send to client
        send(PolQ, data);
        % Check for updates to Message:
        tmp = poll(SenQ);
        if ~isempty(tmp)
            Message = tmp;
        end
    end
end
What is a more efficient way of doing this? Is there a way to access a specific queue entry and flush the queue after retreiving? Alternatively, is there a more efficient way of signalling the worker that the client is ready to receive data, so that it isn't continuously broadcasting messages?
2 Commenti
  Mario Malic
      
 il 5 Giu 2023
				I don't know a lot about parallel computing, but maybe this could be more appropriate parallel.pool.DataQueue + afterEach
However, having pause within the block while still having data to process is opposite of what you want to do, that pause should be taken out of the if statement block.
% if PolQ.QueueLength > 0
    % code
% end
Also, persistent variables could help, this is the amount of time it takes to construct a PollableDataQueue, and you do that every time you use send function if I understood this code correctly. 
tic;
SenQ = parallel.pool.PollableDataQueue;
toc
persistent SenQ
if isempty(SenQ)
    SenQ = parallel.pool.PollableDataQueue;
end
Risposte (1)
  Thomas Falch
    
 il 15 Mag 2025
        From R2025a you can use the new "any-destination" PollableDataQueue to solve this problem. You can create it like this:
queue = parallel.pool.PollableDataQueue(Destination="any")
Unlike the old PollableDataQueue, anyone with a copy of the any-destination queue can send and receive. This makes it easier to send from a client to the workers (without having to create a queue on the worker and send it back to the client like you do), to send from one worker to another, or to "send" from one worker to itself. (See PollableDataQueue - Send and poll data between client and workers - MATLAB for documentation)
This final feature can be used here, since the faster while loop can remove an element by polling (if one is present) before adding a new one. This makes the maximum queue length 1, and the receiver only gets the latest value.
Alternatively, you can check the QueueLength before adding anything. The QueueLength is really "the number of messages which this client/worker can poll of this queue". Since only one worker/client can poll for the old type of PollableDataQueue, QueueLength is 0 everywhere else. For the new any-destination PollableDataQueue, any worker/client can poll(), so the value is the acutal number of elements on the queue everywhere.
0 Commenti
Vedere anche
Categorie
				Scopri di più su Parallel Computing Fundamentals in Help Center e File Exchange
			
	Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!


