222 |
222 |
223 # Create queues |
223 # Create queues |
224 taskQueue = multiprocessing.Queue() |
224 taskQueue = multiprocessing.Queue() |
225 doneQueue = multiprocessing.Queue() |
225 doneQueue = multiprocessing.Queue() |
226 |
226 |
227 # Submit tasks (initially two time number of processes |
227 # Submit tasks (initially two times the number of processes) |
|
228 tasks = len(argumentsList) |
228 initialTasks = 2 * NumberOfProcesses |
229 initialTasks = 2 * NumberOfProcesses |
229 for task in argumentsList[:initialTasks]: |
230 for _ in range(initialTasks): |
230 taskQueue.put(task) |
231 taskQueue.put(argumentsList.pop(0)) |
231 |
232 |
232 # Start worker processes |
233 # Start worker processes |
233 workers = [ |
234 workers = [ |
234 multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue)) |
235 multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue)) |
235 for _ in range(NumberOfProcesses) |
236 for _ in range(NumberOfProcesses) |
236 ] |
237 ] |
237 for worker in workers: |
238 for worker in workers: |
238 worker.start() |
239 worker.start() |
239 |
240 |
240 # Get and send results |
241 # Get and send results |
241 endIndex = len(argumentsList) - initialTasks |
242 for _ in range(tasks): |
242 for i in range(len(argumentsList)): |
|
243 resultSent = False |
243 resultSent = False |
244 wasCancelled = False |
244 wasCancelled = False |
245 |
245 |
246 while not resultSent: |
246 while not resultSent: |
247 try: |
247 try: |
257 |
257 |
258 if wasCancelled or cancelled(): |
258 if wasCancelled or cancelled(): |
259 # just exit the loop ignoring the results of queued tasks |
259 # just exit the loop ignoring the results of queued tasks |
260 break |
260 break |
261 |
261 |
262 if i < endIndex: |
262 if argumentsList: |
263 taskQueue.put(argumentsList[i + initialTasks]) |
263 taskQueue.put(argumentsList.pop(0)) |
264 |
264 |
265 # Tell child processes to stop |
265 # Tell child processes to stop |
266 for _ in range(NumberOfProcesses): |
266 for _ in range(NumberOfProcesses): |
267 taskQueue.put("STOP") |
267 taskQueue.put("STOP") |
268 |
268 |