Skip to content

Commit 4e18db2

Browse files
committed
Added monitor rate limiting
1 parent bbf2f6b commit 4e18db2

File tree

3 files changed

+161
-48
lines changed

3 files changed

+161
-48
lines changed

commands/daemon/monitor.go

+67-18
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"io"
22+
"sync/atomic"
2223

2324
"github.com/arduino/arduino-cli/arduino/monitors"
2425
rpc "github.com/arduino/arduino-cli/rpc/monitor"
@@ -79,6 +80,17 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
7980
streamClosed := make(chan error)
8081
targetClosed := make(chan error)
8182

83+
// set rate limiting window
84+
bufferSize := int(config.GetRecvRateLimitBuffer())
85+
rateLimitEnabled := (bufferSize > 0)
86+
if !rateLimitEnabled {
87+
bufferSize = 1024
88+
}
89+
buffer := make([]byte, bufferSize)
90+
bufferUsed := 0
91+
92+
var writeSlots int32
93+
8294
// now we can read the other messages and re-route to the monitor...
8395
go func() {
8496
for {
@@ -95,6 +107,11 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
95107
break
96108
}
97109

110+
if rateLimitEnabled {
111+
// Increase rate limiter write slots
112+
atomic.AddInt32(&writeSlots, msg.GetRecvAcknowledge())
113+
}
114+
98115
if _, err := mon.Write(msg.GetData()); err != nil {
99116
// error writing to target
100117
targetClosed <- err
@@ -105,27 +122,59 @@ func (s *MonitorService) StreamingOpen(stream rpc.Monitor_StreamingOpenServer) e
105122

106123
// ...and read from the monitor and forward to the output stream
107124
go func() {
108-
buf := make([]byte, 8)
125+
dropBuffer := make([]byte, 10240)
126+
dropped := 0
109127
for {
110-
n, err := mon.Read(buf)
111-
if err != nil {
112-
// error reading from target
113-
targetClosed <- err
114-
break
115-
}
116-
117-
if n == 0 {
118-
// target was closed
119-
targetClosed <- nil
120-
break
128+
if bufferUsed < bufferSize {
129+
if n, err := mon.Read(buffer[bufferUsed:]); err != nil {
130+
// error reading from target
131+
targetClosed <- err
132+
break
133+
} else if n == 0 {
134+
// target was closed
135+
targetClosed <- nil
136+
break
137+
} else {
138+
bufferUsed += n
139+
}
140+
} else {
141+
// FIXME: a very rare condition but still...
142+
// we may be waiting here while, in the meantime, a transmit slot is
143+
// freed: in this case the (filled) buffer will stay in the server
144+
// until the following Read exits (-> the next char arrives from the
145+
// monitor).
146+
147+
if n, err := mon.Read(dropBuffer); err != nil {
148+
// error reading from target
149+
targetClosed <- err
150+
break
151+
} else if n == 0 {
152+
// target was closed
153+
targetClosed <- nil
154+
break
155+
} else {
156+
dropped += n
157+
}
121158
}
122159

123-
if err = stream.Send(&rpc.StreamingOpenResp{
124-
Data: buf[:n],
125-
}); err != nil {
126-
// error sending to stream
127-
streamClosed <- err
128-
break
160+
slots := atomic.LoadInt32(&writeSlots)
161+
if !rateLimitEnabled || slots > 0 {
162+
if err = stream.Send(&rpc.StreamingOpenResp{
163+
Data: buffer[:bufferUsed],
164+
Dropped: int32(dropped),
165+
}); err != nil {
166+
// error sending to stream
167+
streamClosed <- err
168+
break
169+
}
170+
bufferUsed = 0
171+
dropped = 0
172+
173+
// Rate limit, filling all the available window
174+
if rateLimitEnabled {
175+
slots = atomic.AddInt32(&writeSlots, -1)
176+
//fmt.Println("FREE SLOTS:", slots)
177+
}
129178
}
130179
}
131180
}()

rpc/monitor/monitor.pb.go

+78-30
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/monitor/monitor.proto

+16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ message StreamingOpenReq {
4444

4545
// The data to be sent to the target being monitored.
4646
bytes data = 2;
47+
48+
// When the rate limiter is enabled, this parameter is used to report the
49+
// number of successfully processed StreamingOpenResp messages (with data).
50+
int32 recv_acknowledge = 3;
4751
}
4852
}
4953

@@ -61,10 +65,22 @@ message MonitorConfig {
6165
// Additional parameters that might be needed to configure the target or the
6266
// monitor itself.
6367
google.protobuf.Struct additionalConfig = 3;
68+
69+
// This parameter indicates how many bytes should be buffered on the server side
70+
// before dropping. If >0 then the server will enable a rate limiter and will send
71+
// incoming data to the client only when the client allows it: see the
72+
// StreamingOpenReq.recv_acknowledge parameter for details.
73+
int32 recv_rate_limit_buffer = 4;
6474
}
6575

6676
//
6777
message StreamingOpenResp {
6878
// The data received from the target.
6979
bytes data = 1;
80+
81+
// The number of bytes dropped.
82+
// During regular updates this number should be 0, but in case the
83+
// client is not able to process the recv window quickly enough this
84+
// parameter will report the number of dropped bytes.
85+
int32 dropped = 2;
7086
}

0 commit comments

Comments
 (0)