92 |
92 |
93 # Create queues |
93 # Create queues |
94 taskQueue = multiprocessing.Queue() |
94 taskQueue = multiprocessing.Queue() |
95 doneQueue = multiprocessing.Queue() |
95 doneQueue = multiprocessing.Queue() |
96 |
96 |
97 # Submit tasks (initially two time number of processes |
97 # Submit tasks (initially two times the number of processes) |
|
98 tasks = len(argumentsList) |
98 initialTasks = 2 * NumberOfProcesses |
99 initialTasks = 2 * NumberOfProcesses |
99 for task in argumentsList[:initialTasks]: |
100 for _ in range(initialTasks): |
100 taskQueue.put(task) |
101 taskQueue.put(argumentsList.pop(0)) |
101 |
102 |
102 # Start worker processes |
103 # Start worker processes |
103 workers = [ |
104 workers = [ |
104 multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue)) |
105 multiprocessing.Process(target=workerTask, args=(taskQueue, doneQueue)) |
105 for _ in range(NumberOfProcesses) |
106 for _ in range(NumberOfProcesses) |
106 ] |
107 ] |
107 for worker in workers: |
108 for worker in workers: |
108 worker.start() |
109 worker.start() |
109 |
110 |
110 # Get and send results |
111 # Get and send results |
111 endIndex = len(argumentsList) - initialTasks |
112 for _ in range(tasks): |
112 for i in range(len(argumentsList)): |
|
113 resultSent = False |
113 resultSent = False |
114 wasCancelled = False |
114 wasCancelled = False |
115 |
115 |
116 while not resultSent: |
116 while not resultSent: |
117 try: |
117 try: |
127 |
127 |
128 if wasCancelled or cancelled(): |
128 if wasCancelled or cancelled(): |
129 # just exit the loop ignoring the results of queued tasks |
129 # just exit the loop ignoring the results of queued tasks |
130 break |
130 break |
131 |
131 |
132 if i < endIndex: |
132 if argumentsList: |
133 taskQueue.put(argumentsList[i + initialTasks]) |
133 taskQueue.put(argumentsList.pop(0)) |
134 |
134 |
135 # Tell child processes to stop |
135 # Tell child processes to stop |
136 for _ in range(NumberOfProcesses): |
136 for _ in range(NumberOfProcesses): |
137 taskQueue.put("STOP") |
137 taskQueue.put("STOP") |
138 |
138 |