Is it possible to use parfeval and backgroundPool to write to a Mongo database in a background thread?

9 visualizzazioni (ultimi 30 giorni)
I am processing a large table of data, for example 100000x100. After processing this data, I want to write each row of the table into a mongodb database using the mongoc driver. It takes over 150s, so I tried various ways to optimize the code. What I am doing now is below:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(c.Value, 'collection', insertData);
end
The above code works and I am able to write to the mongo database in about 20 seconds, but this is still too slow. So I want to know if there is a way I can do this write in the background by using parfeval. I have tried the following but get errors with parallel.pool.Constant or I get errors saying MongoConnection is not a supported thread based worker. I am confused by this since I am able to use 'parfor' with a MongoConnection, so why can't I use parfeval?
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName')););
% f = parfeval(backgroundPool, @(data, const) TableInsert(data, const.Value), 1, dataTable, c);
% f = parfeval(backgroundPool, TableInsert, 1, dataTable, c);
f = parfeval(backgroundPool, @TableInsert, 1, dataTable, c.Value);
o = fetchOutputs(f);
fprintf(o);
function complete = TableInsert(dataTable, C)
tic;
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
% mconn = C.Value;
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
insert(C, 'collection', insertData);
end
toc;
complete = sprintf('Table insert was completed in: %f\n', toc);
end
In other attempts to speed up the large data write, I have used pymongo instead of using mongoc. This is definitely faster, but I still am having trouble passing off the write to a background worker. Below is what I have tried using pymongo:
% Create const connection to mongoclient using pymongo
C = parallel.pool.Constant(@() py.pymongo.MongoClient('xx.xx.xxx.xx:xxxxx')); % ip, port, database name
f = parfeval(backgroundPool, @(data, c) pythonTableInsert(data, c.Value), 1, dataTable, C);
% Get output
o = fetchOutputs(f);
fprintf(o);
function complete = pythonTableInsert(dataTable, C)
tic;
% Batch insert
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
parfor i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
%Get table batch
insertData = table2struct(dataTable(start:stop, :));
% Convert to 1xN cell array where each cell is a 1x1 struct
insertList = num2cell(insertData)';
% Insert
collection = C.get_database('database').get_collection('collection');
collection.insert_many(insertList);
end
toc;
complete = sprintf('Python insert was completed in: %f\n', toc);
end

Risposte (1)

Edric Ellis
Edric Ellis il 21 Nov 2023
In your first case using parfor, if you haven't changed any settings, you will be using a process pool. (The default profile is "Processes", and with default settings, a pool will be created there when you first hit parfor).
If you want to run stuff in the background, you need to run a bunch of parfeval requests, a bit like this:
c = parallel.pool.Constant(@() mongoc('serverIP', 1000, 'dbName'));
batchSize = 10000;
numBatches = ceil(height(dataTable)/batchSize);
for i=1:numBatches
start = (i -1)* batchSize + 1;
stop = min(i*batchSize, height(dataTable));
fut(i) = parfeval(@iRunInsert, 0, c, dataTable(start:stop,:));
end
function iRunInsert(c, data)
insert(c.Value, 'collection', data);
end
  4 Commenti
Suchitha Nama
Suchitha Nama il 21 Nov 2023
I don't need to use backgroundPool specifically, but from reading the documentation/browsing through other answers - I thought that is what I had to use to run asynchronously. I have ensured that parpool is running before starting my code and I have a timer like this
tic;
for
% parfeval calls
end
toc;
So, yes it does take 30 seconds to issue all of the parfeval calls. I have 12 workers in my parpool, not sure if that is a limitation or not.
Edric Ellis
Edric Ellis il 22 Nov 2023
Hm, that seems like a long time simply to issue the parfeval calls - I would normally expect each call to parfeval to run in a few milliseconds or so (in the absence of large amounts of data being transmitted). What's the value of numBatches? It might be worth running using the profile command in effect to see what's going on. It's possible that splitting up dataTable is taking a long time.

Accedi per commentare.

Categorie

Scopri di più su Parallel Computing Fundamentals in Help Center e File Exchange

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by