Skip to content

Commit 155bad3

Browse files
committed
Added monitor rate limiting
1 parent 4b42a29 commit 155bad3

File tree

3 files changed

+158
-48
lines changed

3 files changed

+158
-48
lines changed

commands/daemon/monitor.go

+64-18
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"io"
22+
"sync/atomic"
2223

2324
"github.com/arduino/arduino-cli/arduino/monitors"
2425
rpc "github.com/arduino/arduino-cli/rpc/monitor"
@@ -79,6 +80,14 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
7980
streamClosed := make(chan error)
8081
targetClosed := make(chan error)
8182

83+
// set rate limiting window
84+
bufferSize := int(config.GetRecvRateLimitBuffer())
85+
buffer := make([]byte, bufferSize)
86+
bufferUsed := 0
87+
88+
rateLimitEnabled := (bufferSize > 0)
89+
var writeSlots int32
90+
8291
// now we can read the other messages and re-route to the monitor...
8392
go func() {
8493
for {
@@ -95,6 +104,11 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
95104
break
96105
}
97106

107+
if rateLimitEnabled {
108+
// Increase rate limiter write slots
109+
atomic.AddInt32(&writeSlots, msg.GetRecvAcknowledge())
110+
}
111+
98112
if _, err := mon.Write(msg.GetData()); err != nil {
99113
// error writing to target
100114
targetClosed <- err
@@ -105,27 +119,59 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
105119

106120
// ...and read from the monitor and forward to the output stream
107121
go func() {
108-
buf := make([]byte, 8)
122+
dropBuffer := make([]byte, 10240)
123+
dropped := 0
109124
for {
110-
n, err := mon.Read(buf)
111-
if err != nil {
112-
// error reading from target
113-
targetClosed <- err
114-
break
115-
}
116-
117-
if n == 0 {
118-
// target was closed
119-
targetClosed <- nil
120-
break
125+
if bufferUsed < bufferSize {
126+
if n, err := mon.Read(buffer[bufferUsed:]); err != nil {
127+
// error reading from target
128+
targetClosed <- err
129+
break
130+
} else if n == 0 {
131+
// target was closed
132+
targetClosed <- nil
133+
break
134+
} else {
135+
bufferUsed += n
136+
}
137+
} else {
138+
// FIXME: a very rare condition but still...
139+
// we may be waiting here while, in the meantime, a transmit slot is
140+
// freed: in this case the (filled) buffer will stay in the server
141+
// until the following Read exits (-> the next char arrives from the
142+
// monitor).
143+
144+
if n, err := mon.Read(dropBuffer); err != nil {
145+
// error reading from target
146+
targetClosed <- err
147+
break
148+
} else if n == 0 {
149+
// target was closed
150+
targetClosed <- nil
151+
break
152+
} else {
153+
dropped += n
154+
}
121155
}
122156

123-
if err = stream.Send(&rpc.StreamingOpenResp{
124-
Data: buf[:n],
125-
}); err != nil {
126-
// error sending to stream
127-
streamClosed <- err
128-
break
157+
slots := atomic.LoadInt32(&writeSlots)
158+
if !rateLimitEnabled || slots > 0 {
159+
if err = stream.Send(&rpc.StreamingOpenResp{
160+
Data: buffer[:bufferUsed],
161+
Dropped: int32(dropped),
162+
}); err != nil {
163+
// error sending to stream
164+
streamClosed <- err
165+
break
166+
}
167+
bufferUsed = 0
168+
dropped = 0
169+
170+
// Rate limit, filling all the available window
171+
if rateLimitEnabled {
172+
slots = atomic.AddInt32(&writeSlots, -1)
173+
//fmt.Println("FREE SLOTS:", slots)
174+
}
129175
}
130176
}
131177
}()

rpc/monitor/monitor.pb.go

+78-30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/monitor/monitor.proto

+16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ message StreamingOpenReq {
4444

4545
// The data to be sent to the target being monitored.
4646
bytes data = 2;
47+
48+
// When the rate limiter is enabled, this parameter is used to report the
49+
// number of successfully processed StreamingOpenResp messages (with data).
50+
int32 recv_acknowledge = 3;
4751
}
4852
}
4953

@@ -61,10 +65,22 @@ message MonitorConfig {
6165
// Additional parameters that might be needed to configure the target or the
6266
// monitor itself.
6367
google.protobuf.Struct additionalConfig = 3;
68+
69+
// This parameter indicates how many bytes should be buffered on the server side
70+
// before dropping. If >0 then the server will enable a rate limiter and will send
71+
// incoming data to the client only when the client allows it: see the
72+
// StreamingOpenReq.recv_acknowledge parameter for details.
73+
int32 recv_rate_limit_buffer = 4;
6474
}
6575

6676
//
6777
message StreamingOpenResp {
6878
// The data received from the target.
6979
bytes data = 1;
80+
81+
// The number of bytes dropped.
82+
// During regular updates this number should be 0, but in case the
83+
// client is not able to process the recv window quickly enough this
84+
// parameter will report the number of dropped bytes.
85+
int32 dropped = 2;
7086
}

0 commit comments

Comments
 (0)