@@ -222,6 +222,9 @@ class HostStreamer {
222
222
// queue (sycl_q_) to perform the request. It also performs the callbacks
223
223
// to the user code when the requests have been completed.
224
224
static void KernelLaunchAndWaitThread () {
225
+ size_t producer_count = 0 ;
226
+ size_t consumer_count = 0 ;
227
+
225
228
// Do this loop until told (by main thread) to stop via the
226
229
// 'kill_kernel_thread_flag_' atomic shared variable.
227
230
while (!kill_kernel_thread_flag_) {
@@ -238,6 +241,9 @@ class HostStreamer {
238
241
239
242
// pop from the Producer queue
240
243
produce_q_.Pop ();
244
+
245
+ // accumulate producer count
246
+ producer_count += count;
241
247
}
242
248
243
249
// If there is a Consume request to launch, do it
@@ -247,12 +253,18 @@ class HostStreamer {
247
253
size_t count;
248
254
std::tie (buf_idx, count) = consume_q_.Front ();
249
255
250
- // launch the kernel and push the request to the launch queue
251
- auto e = LaunchConsumerKernel (consumer_buffer_[buf_idx], count);
252
- launch_q_.Push (std::make_tuple (buf_idx, count, e, false ));
256
+ // Only launch consumer when there is enough producer count
257
+ if (producer_count >= consumer_count + count) {
258
+ // launch the kernel and push the request to the launch queue
259
+ auto e = LaunchConsumerKernel (consumer_buffer_[buf_idx], count);
260
+ launch_q_.Push (std::make_tuple (buf_idx, count, e, false ));
253
261
254
- // pop from the Consumer queue
255
- consume_q_.Pop ();
262
+ // pop from the Consumer queue
263
+ consume_q_.Pop ();
264
+
265
+ // accumulate consumer count
266
+ consumer_count += count;
267
+ }
256
268
}
257
269
258
270
// Wait on the oldest event to finish given 2 conditions:
@@ -261,7 +273,7 @@ class HostStreamer {
261
273
// 2) the user has requested us to flush the launch queue and the
262
274
// launch queue is not empty (i.e. flush_ && launch_q_.size() != 0)
263
275
if ((launch_q_.Size () >= wait_threshold_) ||
264
- (flush_ && !LaunchQueueEmpty () && ProducerQueueEmpty () && ConsumerQueueEmpty () )) {
276
+ (flush_ && !LaunchQueueEmpty ())) {
265
277
// grab the oldest request from the launch queue
266
278
size_t buf_idx;
267
279
size_t count;
@@ -276,9 +288,11 @@ class HostStreamer {
276
288
if (request_was_producer) {
277
289
// std::cout << "Calling Producer Callback" << std::endl;
278
290
producer_callback (count);
291
+ producer_count -= count;
279
292
} else {
280
293
// std::cout << "Calling Consumer Callback" << std::endl;
281
294
consumer_callback (consumer_buffer_[buf_idx], count);
295
+ consumer_count -= count;
282
296
}
283
297
284
298
// Pop from the launch queue. This has to be done AFTER waiting on
0 commit comments