Skip to content

Made writes to output stream synchronous #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 53 additions & 14 deletions .github/workflows/check-taskfiles.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# Source: https://github.com/arduino/tooling-project-assets/blob/main/workflow-templates/check-taskfiles.md
name: Check Taskfiles

# See: https://docs.github.com/en/actions/reference/events-that-trigger-workflows
env:
# See: https://github.com/actions/setup-node/#readme
NODE_VERSION: 16.x

# See: https://docs.github.com/actions/using-workflows/events-that-trigger-workflows
on:
create:
push:
paths:
- ".github/workflows/check-taskfiles.ya?ml"
- "package.json"
- "package-lock.json"
- "**/Taskfile.ya?ml"
pull_request:
paths:
- ".github/workflows/check-taskfiles.ya?ml"
- "package.json"
- "package-lock.json"
- "**/Taskfile.ya?ml"
schedule:
# Run every Tuesday at 8 AM UTC to catch breakage resulting from changes to the JSON schema.
Expand All @@ -18,8 +27,33 @@ on:
repository_dispatch:

jobs:
run-determination:
runs-on: ubuntu-latest
outputs:
result: ${{ steps.determination.outputs.result }}
steps:
- name: Determine if the rest of the workflow should run
id: determination
run: |
RELEASE_BRANCH_REGEX="refs/heads/[0-9]+.[0-9]+.x"
# The `create` event trigger doesn't support `branches` filters, so it's necessary to use Bash instead.
if [[
"${{ github.event_name }}" != "create" ||
"${{ github.ref }}" =~ $RELEASE_BRANCH_REGEX
]]; then
# Run the other jobs.
RESULT="true"
else
# There is no need to run the other jobs.
RESULT="false"
fi

echo "result=$RESULT" >> $GITHUB_OUTPUT

validate:
name: Validate ${{ matrix.file }}
needs: run-determination
if: needs.run-determination.outputs.result == 'true'
runs-on: ubuntu-latest

strategy:
Expand All @@ -34,26 +68,31 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v3

- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: ${{ env.NODE_VERSION }}

- name: Download JSON schema for Taskfiles
id: download-schema
uses: carlosperate/download-file-action@v2
with:
# See: https://github.com/SchemaStore/schemastore/blob/master/src/schemas/json/taskfile.json
file-url: https://json.schemastore.org/taskfile.json
# Source: https://github.com/SchemaStore/schemastore/blob/master/src/schemas/json/taskfile.json
file-url: https://taskfile.dev/schema.json
location: ${{ runner.temp }}/taskfile-schema

- name: Install JSON schema validator
run: |
sudo npm install \
--global \
ajv-cli \
ajv-formats
run: npm install

- name: Validate ${{ matrix.file }}
run: |
# See: https://github.com/ajv-validator/ajv-cli#readme
ajv validate \
--all-errors \
--strict=false \
-c ajv-formats \
-s "${{ steps.download-schema.outputs.file-path }}" \
-d "${{ matrix.file }}"
npx \
--package=ajv-cli \
--package=ajv-formats \
ajv validate \
--all-errors \
--strict=false \
-c ajv-formats \
-s "${{ steps.download-schema.outputs.file-path }}" \
-d "${{ matrix.file }}"
1 change: 1 addition & 0 deletions .github/workflows/test-go-task.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,5 @@ jobs:
with:
file: ${{ matrix.module.path }}coverage_unit.txt
flags: ${{ matrix.module.codecov-flags }}
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: ${{ github.repository == 'arduino/pluggable-discovery-protocol-handler' }}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
name: github.com/arduino/go-paths-helper
version: v1.6.1
version: v1.8.0
type: go
summary:
summary:
homepage: https://pkg.go.dev/github.com/arduino/go-paths-helper
license: gpl-2.0-or-later
licenses:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: github.com/arduino/go-properties-orderedmap
version: v1.6.0
version: v1.7.1
type: go
summary: Package properties is a library for handling maps of hierarchical properties.
homepage: https://pkg.go.dev/github.com/arduino/go-properties-orderedmap
Expand Down
113 changes: 49 additions & 64 deletions discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,23 @@ type ErrorCallback func(err string)
// it must be created using the NewServer function.
type Server struct {
impl Discovery
outputChan chan *message
outputWaiter sync.WaitGroup
userAgent string
reqProtocolVersion int
initialized bool
started bool
syncStarted bool
cachedPorts map[string]*Port
cachedErr string
output io.Writer
outputMutex sync.Mutex
}

// NewServer creates a new discovery server backed by the
// provided pluggable discovery implementation. To start the server
// use the Run method.
func NewServer(impl Discovery) *Server {
return &Server{
impl: impl,
outputChan: make(chan *message),
impl: impl,
}
}

Expand All @@ -113,20 +112,20 @@ func NewServer(impl Discovery) *Server {
// the input stream is closed. In case of IO error the error is
// returned.
func (d *Server) Run(in io.Reader, out io.Writer) error {
d.startOutputProcessor(out)
d.output = out
reader := bufio.NewReader(in)
for {
fullCmd, err := reader.ReadString('\n')
if err != nil {
d.outputChan <- messageError("command_error", err.Error())
d.send(messageError("command_error", err.Error()))
return err
}
fullCmd = strings.TrimSpace(fullCmd)
split := strings.Split(fullCmd, " ")
cmd := strings.ToUpper(split[0])

if !d.initialized && cmd != "HELLO" && cmd != "QUIT" {
d.outputChan <- messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd))
d.send(messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd)))
continue
}

Expand All @@ -142,61 +141,62 @@ func (d *Server) Run(in io.Reader, out io.Writer) error {
case "STOP":
d.stop()
case "QUIT":
d.quit()
d.impl.Quit()
d.send(messageOk("quit"))
return nil
default:
d.outputChan <- messageError("command_error", fmt.Sprintf("Command %s not supported", cmd))
d.send(messageError("command_error", fmt.Sprintf("Command %s not supported", cmd)))
}
}
}

func (d *Server) hello(cmd string) {
if d.initialized {
d.outputChan <- messageError("hello", "HELLO already called")
d.send(messageError("hello", "HELLO already called"))
return
}
re := regexp.MustCompile(`^(\d+) "([^"]+)"$`)
matches := re.FindStringSubmatch(cmd)
if len(matches) != 3 {
d.outputChan <- messageError("hello", "Invalid HELLO command")
d.send(messageError("hello", "Invalid HELLO command"))
return
}
d.userAgent = matches[2]
v, err := strconv.ParseInt(matches[1], 10, 64)
if err != nil {
d.outputChan <- messageError("hello", "Invalid protocol version: "+matches[2])
d.send(messageError("hello", "Invalid protocol version: "+matches[2]))
return
}
d.reqProtocolVersion = int(v)
if err := d.impl.Hello(d.userAgent, 1); err != nil {
d.outputChan <- messageError("hello", err.Error())
d.send(messageError("hello", err.Error()))
return
}
d.outputChan <- &message{
d.send(&message{
EventType: "hello",
ProtocolVersion: 1, // Protocol version 1 is the only supported for now...
Message: "OK",
}
})
d.initialized = true
}

func (d *Server) start() {
if d.started {
d.outputChan <- messageError("start", "Discovery already STARTed")
d.send(messageError("start", "Discovery already STARTed"))
return
}
if d.syncStarted {
d.outputChan <- messageError("start", "Discovery already START_SYNCed, cannot START")
d.send(messageError("start", "Discovery already START_SYNCed, cannot START"))
return
}
d.cachedPorts = map[string]*Port{}
d.cachedErr = ""
if err := d.impl.StartSync(d.eventCallback, d.errorCallback); err != nil {
d.outputChan <- messageError("start", "Cannot START: "+err.Error())
d.send(messageError("start", "Cannot START: "+err.Error()))
return
}
d.started = true
d.outputChan <- messageOk("start")
d.send(messageOk("start"))
}

func (d *Server) eventCallback(event string, port *Port) {
Expand All @@ -215,99 +215,84 @@ func (d *Server) errorCallback(msg string) {

func (d *Server) list() {
if !d.started {
d.outputChan <- messageError("list", "Discovery not STARTed")
d.send(messageError("list", "Discovery not STARTed"))
return
}
if d.syncStarted {
d.outputChan <- messageError("list", "discovery already START_SYNCed, LIST not allowed")
d.send(messageError("list", "discovery already START_SYNCed, LIST not allowed"))
return
}
if d.cachedErr != "" {
d.outputChan <- messageError("list", d.cachedErr)
d.send(messageError("list", d.cachedErr))
return
}
ports := []*Port{}
for _, port := range d.cachedPorts {
ports = append(ports, port)
}
d.outputChan <- &message{
d.send(&message{
EventType: "list",
Ports: &ports,
}
})
}

func (d *Server) startSync() {
if d.syncStarted {
d.outputChan <- messageError("start_sync", "Discovery already START_SYNCed")
d.send(messageError("start_sync", "Discovery already START_SYNCed"))
return
}
if d.started {
d.outputChan <- messageError("start_sync", "Discovery already STARTed, cannot START_SYNC")
d.send(messageError("start_sync", "Discovery already STARTed, cannot START_SYNC"))
return
}
if err := d.impl.StartSync(d.syncEvent, d.errorEvent); err != nil {
d.outputChan <- messageError("start_sync", "Cannot START_SYNC: "+err.Error())
d.send(messageError("start_sync", "Cannot START_SYNC: "+err.Error()))
return
}
d.syncStarted = true
d.outputChan <- messageOk("start_sync")
d.send(messageOk("start_sync"))
}

func (d *Server) stop() {
if !d.syncStarted && !d.started {
d.outputChan <- messageError("stop", "Discovery already STOPped")
d.send(messageError("stop", "Discovery already STOPped"))
return
}
if err := d.impl.Stop(); err != nil {
d.outputChan <- messageError("stop", "Cannot STOP: "+err.Error())
d.send(messageError("stop", "Cannot STOP: "+err.Error()))
return
}
d.started = false
if d.syncStarted {
d.syncStarted = false
}
d.outputChan <- messageOk("stop")
d.send(messageOk("stop"))
}

func (d *Server) syncEvent(event string, port *Port) {
d.outputChan <- &message{
d.send(&message{
EventType: event,
Port: port,
}
}

func (d *Server) quit() {
d.impl.Quit()
d.outputChan <- messageOk("quit")
close(d.outputChan)
// If we don't wait for all messages
// to be consumed by the output processor
// we risk not printing the "quit" message.
// This may cause issues to consumers of
// the discovery since they expect a message
// that is never sent.
d.outputWaiter.Wait()
})
}

func (d *Server) errorEvent(msg string) {
d.outputChan <- messageError("start_sync", msg)
d.send(messageError("start_sync", msg))
}

func (d *Server) startOutputProcessor(outWriter io.Writer) {
// Start go routine to serialize messages printing
d.outputWaiter.Add(1)
go func() {
for msg := range d.outputChan {
data, err := json.MarshalIndent(msg, "", " ")
if err != nil {
// We are certain that this will be marshalled correctly
// so we don't handle the error
data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ")
}
fmt.Fprintln(outWriter, string(data))
}
// We finished consuming all messages, now
// we can exit for real
d.outputWaiter.Done()
}()
func (d *Server) send(msg *message) {
data, err := json.MarshalIndent(msg, "", " ")
if err != nil {
// We are certain that this will be marshalled correctly
// so we don't handle the error
data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ")
}
data = append(data, '\n')

d.outputMutex.Lock()
defer d.outputMutex.Unlock()
n, err := d.output.Write(data)
if n != len(data) || err != nil {
panic("ERROR")
}
}
Loading