Skip to content

Commit 2cc8874

Browse files
committed
---
yaml --- r: 469 b: refs/heads/master c: 97d6342 h: refs/heads/master i: 467: 9db76a3 v: v3
1 parent 57f0009 commit 2cc8874

File tree

10 files changed

+114
-56
lines changed

10 files changed

+114
-56
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
---
2-
refs/heads/master: 5917ca35190b526b65b4d26ad0b98024ce9e0b09
2+
refs/heads/master: 97d6342bf08e55f8d2b4f8df5c4b5a099df0191c

trunk/src/boot/me/trans.ml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2932,6 +2932,7 @@ let trans_visitor
29322932
(slot:Ast.slot)
29332933
(curr_iso:Ast.ty_iso option)
29342934
: unit =
2935+
check_and_flush_chan cell slot;
29352936
drop_slot (get_ty_params_of_current_frame()) cell slot curr_iso
29362937

29372938
and drop_ty_in_current_frame
@@ -4188,6 +4189,25 @@ let trans_visitor
41884189
let last_jumps = Array.map trans_arm at.Ast.alt_tag_arms in
41894190
Array.iter patch last_jumps
41904191

4192+
(* If we're about to drop a channel, synthesize an upcall_flush_chan.
4193+
* TODO: This should rather appear in a chan dtor when chans become
4194+
* objects. *)
4195+
and check_and_flush_chan
4196+
(cell:Il.cell)
4197+
(slot:Ast.slot)
4198+
: unit =
4199+
let ty = strip_mutable_or_constrained_ty (slot_ty slot) in
4200+
match simplified_ty ty with
4201+
Ast.TY_chan _ ->
4202+
annotate "check_and_flush_chan, flush_chan";
4203+
let rc = box_rc_cell cell in
4204+
emit (Il.cmp (Il.Cell rc) one);
4205+
let jump = mark () in
4206+
emit (Il.jmp Il.JNE Il.CodeNone);
4207+
trans_void_upcall "upcall_flush_chan" [| Il.Cell cell |];
4208+
patch jump;
4209+
| _ -> ()
4210+
41914211
and drop_slots_at_curr_stmt _ : unit =
41924212
let stmt = Stack.top curr_stmt in
41934213
match htab_search cx.ctxt_post_stmt_slot_drops stmt with

trunk/src/rt/rust_chan.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,22 @@ rust_chan::rust_chan(rust_task *task, maybe_proxy<rust_port> *port) :
1818
}
1919

2020
rust_chan::~rust_chan() {
21-
if (port && !port->is_proxy()) {
22-
port->delegate()->chans.swap_delete(this);
23-
}
21+
task->log(rust_log::MEM | rust_log::COMM,
22+
"del rust_chan(task=0x%" PRIxPTR ")", (uintptr_t) this);
23+
24+
A(task->dom, is_associated() == false,
25+
"Channel must be disassociated before being freed.");
2426
}
2527

2628
/**
2729
* Link this channel with the specified port.
2830
*/
2931
void rust_chan::associate(maybe_proxy<rust_port> *port) {
3032
this->port = port;
31-
if (!port->is_proxy()) {
33+
if (port->is_proxy() == false) {
34+
task->log(rust_log::TASK,
35+
"associating chan: 0x%" PRIxPTR " with port: 0x%" PRIxPTR,
36+
this, port);
3237
this->port->delegate()->chans.push(this);
3338
}
3439
}
@@ -43,14 +48,23 @@ bool rust_chan::is_associated() {
4348
void rust_chan::disassociate() {
4449
A(task->dom, is_associated(), "Channel must be associated with a port.");
4550

51+
if (port->is_proxy() == false) {
52+
task->log(rust_log::TASK,
53+
"disassociating chan: 0x%" PRIxPTR " from port: 0x%" PRIxPTR,
54+
this, port->delegate());
55+
port->delegate()->chans.swap_delete(this);
56+
}
57+
4658
// Delete reference to the port.
4759
port = NULL;
4860
}
4961

5062
/**
51-
* Attempt to transmit channel data to the associated port.
63+
* Attempt to send data to the associated port.
5264
*/
53-
void rust_chan::transmit() {
65+
void rust_chan::send(void *sptr) {
66+
buffer.enqueue(sptr);
67+
5468
rust_dom *dom = task->dom;
5569
if (!is_associated()) {
5670
W(dom, is_associated(),
@@ -81,7 +95,6 @@ void rust_chan::transmit() {
8195

8296
return;
8397
}
84-
8598
//
8699
// Local Variables:
87100
// mode: C++

trunk/src/rt/rust_chan.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ class rust_chan : public rc_base<rust_chan>,
1717
void disassociate();
1818
bool is_associated();
1919

20-
void transmit();
20+
void send(void *sptr);
2121
};
2222

2323
//

trunk/src/rt/rust_dom.cpp

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ rust_dom::reap_dead_tasks() {
237237
rust_task *task = dead_tasks[i];
238238
if (task->ref_count == 0) {
239239
I(this, task->tasks_waiting_to_join.is_empty());
240-
241240
dead_tasks.swap_delete(task);
242241
log(rust_log::TASK,
243242
"deleting unreferenced dead task 0x%" PRIxPTR, task);
@@ -392,10 +391,9 @@ rust_dom::start_main_loop()
392391
// if progress is made in other domains.
393392

394393
if (scheduled_task == NULL) {
395-
log(rust_log::TASK,
396-
"all tasks are blocked, waiting for progress ...");
397-
if (_log.is_tracing(rust_log::TASK))
394+
if (_log.is_tracing(rust_log::TASK)) {
398395
log_state();
396+
}
399397
log(rust_log::TASK,
400398
"all tasks are blocked, scheduler yielding ...");
401399
sync::yield();
@@ -437,18 +435,6 @@ rust_dom::start_main_loop()
437435
log(rust_log::DOM, "terminated scheduler loop, reaping dead tasks ...");
438436

439437
while (dead_tasks.length() > 0) {
440-
log(rust_log::DOM,
441-
"waiting for %d dead tasks to become dereferenced ...",
442-
dead_tasks.length());
443-
444-
if (_log.is_tracing(rust_log::DOM)) {
445-
for (size_t i = 0; i < dead_tasks.length(); i++) {
446-
log(rust_log::DOM,
447-
"task: 0x%" PRIxPTR ", index: %d, ref_count: %d",
448-
dead_tasks[i], i, dead_tasks[i]->ref_count);
449-
}
450-
}
451-
452438
if (_incoming_message_queue.is_empty()) {
453439
log(rust_log::DOM,
454440
"waiting for %d dead tasks to become dereferenced, "

trunk/src/rt/rust_message.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ send(uint8_t *buffer, size_t buffer_sz, const char* label, rust_task *source,
9090
}
9191

9292
void data_message::process() {
93-
_port->remote_channel->buffer.enqueue(_buffer);
94-
_port->remote_channel->transmit();
93+
_port->remote_channel->send(_buffer);
9594
_target->log(rust_log::COMM, "<=== received data via message ===");
9695
}
9796

trunk/src/rt/rust_port.cpp

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,32 @@ rust_port::~rust_port() {
2121

2222
// Disassociate channels from this port.
2323
while (chans.is_empty() == false) {
24-
chans.pop()->disassociate();
24+
rust_chan *chan = chans.peek();
25+
chan->disassociate();
2526
}
2627

27-
// We're the only ones holding a reference to the remote channel, so
28-
// clean it up.
2928
delete remote_channel;
3029
}
3130

31+
bool rust_port::receive(void *dptr) {
32+
for (uint32_t i = 0; i < chans.length(); i++) {
33+
rust_chan *chan = chans[i];
34+
if (chan->buffer.is_empty() == false) {
35+
chan->buffer.dequeue(dptr);
36+
if (chan->buffer.is_empty() && chan->task->blocked()) {
37+
task->log(rust_log::COMM,
38+
"chan: 0x%" PRIxPTR
39+
" is flushing, wakeup task: 0x%" PRIxPTR,
40+
chan, chan->task);
41+
chan->task->wakeup(this);
42+
}
43+
task->log(rust_log::COMM, "<=== read data ===");
44+
return true;
45+
}
46+
}
47+
return false;
48+
}
49+
3250
void rust_port::log_state() {
3351
task->log(rust_log::COMM,
3452
"rust_port: 0x%" PRIxPTR ", associated channel(s): %d",

trunk/src/rt/rust_port.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class rust_port : public maybe_proxy<rust_port>,
1616
rust_port(rust_task *task, size_t unit_sz);
1717
~rust_port();
1818
void log_state();
19+
bool receive(void *dptr);
1920
};
2021

2122
//

trunk/src/rt/rust_task.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,11 @@ get_callee_save_fp(uintptr_t *top_of_callee_saves)
323323

324324
void
325325
rust_task::kill() {
326+
if (dead()) {
327+
// Task is already dead, can't kill what's already dead.
328+
return;
329+
}
330+
326331
// Note the distinction here: kill() is when you're in an upcall
327332
// from task A and want to force-fail task B, you do B->kill().
328333
// If you want to fail yourself you do self->fail(upcall_nargs).

trunk/src/rt/rust_upcall.cpp

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,27 +97,51 @@ upcall_new_chan(rust_task *task, rust_port *port) {
9797
return new (dom) rust_chan(task, port);
9898
}
9999

100+
/**
101+
* Called whenever this channel needs to be flushed. This can happen due to a
102+
* flush statement, or automatically whenever a channel's ref count is
103+
* about to drop to zero.
104+
*/
105+
extern "C" CDECL void
106+
upcall_flush_chan(rust_task *task, rust_chan *chan) {
107+
LOG_UPCALL_ENTRY(task);
108+
rust_dom *dom = task->dom;
109+
task->log(rust_log::UPCALL | rust_log::COMM,
110+
"flush chan: 0x%" PRIxPTR, chan);
111+
112+
if (chan->buffer.is_empty()) {
113+
return;
114+
}
115+
116+
A(dom, chan->port->is_proxy() == false,
117+
"Channels to remote ports should be flushed automatically.");
118+
119+
// Block on the port until this channel has been completely drained
120+
// by the port.
121+
task->block(chan->port);
122+
task->yield(2);
123+
}
124+
100125
/**
101126
* Called whenever the channel's ref count drops to zero.
127+
*
128+
* Cannot Yield: If the task were to unwind, the dropped ref would still
129+
* appear to be live, causing modify-after-free errors.
102130
*/
103131
extern "C" CDECL void upcall_del_chan(rust_task *task, rust_chan *chan) {
104132
LOG_UPCALL_ENTRY(task);
105-
rust_dom *dom = task->dom;
133+
106134
task->log(rust_log::UPCALL | rust_log::MEM | rust_log::COMM,
107135
"upcall del_chan(0x%" PRIxPTR ")", (uintptr_t) chan);
108-
I(dom, !chan->ref_count);
109-
110-
if (!chan->buffer.is_empty() && chan->is_associated()) {
111-
A(dom, !chan->port->is_proxy(),
112-
"Channels to remote ports should be flushed automatically.");
113-
// A target port may still be reading from this channel.
114-
// Block on this channel until it has been completely drained
115-
// by the port.
116-
task->block(chan);
117-
task->yield(2);
118-
return;
119-
}
120136

137+
A(task->dom, chan->ref_count == 0,
138+
"Channel's ref count should be zero.");
139+
140+
if (chan->is_associated()) {
141+
A(task->dom, chan->buffer.is_empty(),
142+
"Channel's buffer should be empty.");
143+
chan->disassociate();
144+
}
121145
delete chan;
122146
}
123147

@@ -183,8 +207,7 @@ upcall_send(rust_task *task, rust_chan *chan, void *sptr) {
183207
"chan: 0x%" PRIxPTR ", sptr: 0x%" PRIxPTR ", size: %d",
184208
(uintptr_t) chan, (uintptr_t) sptr,
185209
chan->port->delegate()->unit_sz);
186-
chan->buffer.enqueue(sptr);
187-
chan->transmit();
210+
chan->send(sptr);
188211
task->log(rust_log::COMM, "=== sent data ===>");
189212
}
190213

@@ -197,17 +220,8 @@ upcall_recv(rust_task *task, uintptr_t *dptr, rust_port *port) {
197220
(uintptr_t) port, (uintptr_t) dptr, port->unit_sz,
198221
port->chans.length());
199222

200-
for (uint32_t i = 0; i < port->chans.length(); i++) {
201-
rust_chan *chan = port->chans[i];
202-
if (chan->buffer.is_empty() == false) {
203-
chan->buffer.dequeue(dptr);
204-
if (chan->buffer.is_empty() && chan->task->blocked()) {
205-
chan->task->wakeup(chan);
206-
delete chan;
207-
}
208-
task->log(rust_log::COMM, "<=== read data ===");
209-
return;
210-
}
223+
if (port->receive(dptr)) {
224+
return;
211225
}
212226

213227
// No data was buffered on any incoming channel, so block this task
@@ -260,6 +274,8 @@ upcall_exit(rust_task *task) {
260274
LOG_UPCALL_ENTRY(task);
261275
task->log(rust_log::UPCALL | rust_log::TASK,
262276
"task ref_count: %d", task->ref_count);
277+
A(task->dom, task->ref_count >= 0,
278+
"Task ref_count should not be negative on exit!");
263279
task->die();
264280
task->notify_tasks_waiting_to_join();
265281
task->yield(1);

0 commit comments

Comments
 (0)