Skip to content

Commit 06d77d3

Browse files
bors[bot]BusyJay
andauthored
Merge #669
669: channel: discard messages eagerly r=taiki-e a=BusyJay If all receivers are dropped, the memory hold by channel will only be dropped when all senders are also dropped. If there are too many channels the memory leak can be significant. Co-authored-by: Jay Lee <[email protected]>
2 parents b196663 + f0e414d commit 06d77d3

File tree

2 files changed

+74
-4
lines changed

2 files changed

+74
-4
lines changed

crossbeam-channel/src/channel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ impl<T> Drop for Sender<T> {
643643
unsafe {
644644
match &self.flavor {
645645
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
646-
SenderFlavor::List(chan) => chan.release(|c| c.disconnect()),
646+
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
647647
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
648648
}
649649
}
@@ -1135,7 +1135,7 @@ impl<T> Drop for Receiver<T> {
11351135
unsafe {
11361136
match &self.flavor {
11371137
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1138-
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect()),
1138+
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
11391139
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
11401140
ReceiverFlavor::At(_) => {}
11411141
ReceiverFlavor::Tick(_) => {}

crossbeam-channel/src/flavors/list.rs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,10 +530,10 @@ impl<T> Channel<T> {
530530
None
531531
}
532532

533-
/// Disconnects the channel and wakes up all blocked receivers.
533+
/// Disconnects senders and wakes up all blocked receivers.
534534
///
535535
/// Returns `true` if this call disconnected the channel.
536-
pub(crate) fn disconnect(&self) -> bool {
536+
pub(crate) fn disconnect_senders(&self) -> bool {
537537
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
538538

539539
if tail & MARK_BIT == 0 {
@@ -544,6 +544,76 @@ impl<T> Channel<T> {
544544
}
545545
}
546546

547+
/// Disconnects receivers.
548+
///
549+
/// Returns `true` if this call disconnected the channel.
550+
pub(crate) fn disconnect_receivers(&self) -> bool {
551+
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
552+
553+
if tail & MARK_BIT == 0 {
554+
// If receivers are dropped first, discard all messages to free
555+
// memory eagerly.
556+
self.discard_all_messages();
557+
true
558+
} else {
559+
false
560+
}
561+
}
562+
563+
/// Discards all messages.
564+
///
565+
/// This method should only be called when all receivers are dropped.
566+
fn discard_all_messages(&self) {
567+
let backoff = Backoff::new();
568+
let mut tail = self.tail.index.load(Ordering::Acquire);
569+
loop {
570+
let offset = (tail >> SHIFT) % LAP;
571+
if offset != BLOCK_CAP {
572+
break;
573+
}
574+
575+
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
576+
// at boundary. We need to wait for the updates take affect otherwise there
577+
// can be memory leaks.
578+
backoff.snooze();
579+
tail = self.tail.index.load(Ordering::Acquire);
580+
}
581+
582+
let mut head = self.head.index.load(Ordering::Acquire);
583+
let mut block = self.head.block.load(Ordering::Acquire);
584+
585+
unsafe {
586+
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
587+
while head >> SHIFT != tail >> SHIFT {
588+
let offset = (head >> SHIFT) % LAP;
589+
590+
if offset < BLOCK_CAP {
591+
// Drop the message in the slot.
592+
let slot = (*block).slots.get_unchecked(offset);
593+
slot.wait_write();
594+
let p = &mut *slot.msg.get();
595+
p.as_mut_ptr().drop_in_place();
596+
} else {
597+
(*block).wait_next();
598+
// Deallocate the block and move to the next one.
599+
let next = (*block).next.load(Ordering::Acquire);
600+
drop(Box::from_raw(block));
601+
block = next;
602+
}
603+
604+
head = head.wrapping_add(1 << SHIFT);
605+
}
606+
607+
// Deallocate the last remaining block.
608+
if !block.is_null() {
609+
drop(Box::from_raw(block));
610+
}
611+
}
612+
head &= !MARK_BIT;
613+
self.head.block.store(ptr::null_mut(), Ordering::Release);
614+
self.head.index.store(head, Ordering::Release);
615+
}
616+
547617
/// Returns `true` if the channel is disconnected.
548618
pub(crate) fn is_disconnected(&self) -> bool {
549619
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0

0 commit comments

Comments
 (0)