Main Content

close

Close pollable data queue

Since R2025a

    Description

    close(pollablequeue) closes the PollableDataQueue object specified by pollablequeue. You can no longer send data to the queue using the send function.

    example

    Examples

    collapse all

    Define a function, sendMessages, that sends messages to a queue. It sends five messages, simulating work by pausing for one second between sending each message.

    function sendMessages(queue)
        for i = 1:5
            send(queue,sprintf("Message %d from worker",i));
            pause(1);
        end
    end

    Create a pollable data queue, and use parfeval to execute the sendMessages function on a worker. Pause briefly to allow the worker to send some messages.

    queue = parallel.pool.PollableDataQueue;
    f = parfeval(@sendMessages,0,queue);
    pause(2);

    To stop the worker from sending any more data to the queue, call close on the queue.

    close(queue);

    Wait for the future object f to complete. Use the Error property of future object f to verify that an error occurred due to an attempt to send messages to the closed queue.

    wait(f);
    f.Error.message
    ans = 
    'Failed to send data because the DataQueue has been closed.'
    

    This example shows how to use the close function to signal to multiple queue receivers that no more data will be sent, avoiding the need for multiple "stop" messages.

    Define a function dataGenFcn that simulates data acquisition by generating random data and sending the result to the next worker through a PollableDataQueue object. After generating data for a predefined number of iterations, the function closes the workerQueue to signal to the processing workers that it will no longer send data.

    function dataGenFcn(workerQueue,numIter)
    for idx = 1:numIter
        data = randn(5e2);
        send(workerQueue,data);
        pause(0.1)
    end
    close(workerQueue);
    end

    Define the function dataProcessFcn to continuously poll the queue for data, process each received value, and store the results. The loop continues until the processing workers drain the closed queue. When the closed queue is empty, polling returns an empty array and sets OK to false, breaking the while-loop.

    function allResults = dataProcessFcn(workerQueue)
    allResults = [];
    while true
        [data,OK] = poll(workerQueue,Inf);
        if ~OK
            break
        end
        result = max(real(eig(data)));
        allResults = [allResults;result];
        pause(0.2)
    end
    end

    Start a parallel pool with 4 workers.

    pool = parpool(4);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 4 workers.
    

    To enable communication between workers, create a PollableDataQueue object with the Destination argument set to "any". This type of PollableDataQueue object allows any worker to send and receive messages.

    queue = parallel.pool.PollableDataQueue(Destination="any");

    Define the number of iterations and use parfeval to execute the worker functions asynchronously. The first worker generates the data, and the other workers in the pool receive and process the data.

    numIter = 100;
    dataGenFuture = parfeval(@dataGenFcn,0,queue,numIter);
    for f = 1:3
        dataProcessFutures(f) = parfeval(@dataProcessFcn,1,queue);
    end

    Wait for the data processing workers to complete their tasks, and then retrieve all the results from the processing workers using fetchOutputs. Display a histogram of the results.

    wait(dataProcessFutures);
    allResults = fetchOutputs(dataProcessFutures);
    histogram(allResults)

    Input Arguments

    collapse all

    Pollable data queue, specified as a PollableDataQueue object.

    After you close a PollableDataQueue object, you can no longer send data to the queue. Any attempt to send data to the queue results in an error. You can continue to poll the queue for data. You cannot reopen a closed queue.

    Example: p = parallel.pool.PollableDataQueue;

    Version History

    Introduced in R2025a