Skip to content

Commit 29f5765

Browse files
authored
Merge pull request #400 from thedadams/events-stream-to-with-chat
fix: make --events-stream-to work with chat
2 parents 56c16de + 693c970 commit 29f5765

File tree

1 file changed

+43
-25
lines changed

1 file changed

+43
-25
lines changed

pkg/monitor/fd.go

+43-25
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"strconv"
88
"strings"
99
"sync"
10-
"sync/atomic"
1110
"time"
1211

1312
"github.com/gptscript-ai/gptscript/pkg/runner"
@@ -23,8 +22,10 @@ type Event struct {
2322
}
2423

2524
type fileFactory struct {
25+
fileName string
2626
file *os.File
27-
runningCount atomic.Int32
27+
lock sync.Mutex
28+
runningCount int
2829
}
2930

3031
// NewFileFactory creates a new monitor factory that writes events to the location specified.
@@ -33,33 +34,23 @@ type fileFactory struct {
3334
// 2. a file name
3435
// 3. a named pipe in the form "\\.\pipe\my-pipe"
3536
func NewFileFactory(loc string) (runner.MonitorFactory, error) {
36-
var (
37-
file *os.File
38-
err error
39-
)
40-
41-
if strings.HasPrefix(loc, "fd://") {
42-
fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://"))
43-
if err != nil {
44-
return nil, err
45-
}
46-
47-
file = os.NewFile(uintptr(fd), "events")
48-
} else {
49-
file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0644)
50-
if err != nil {
51-
return nil, err
52-
}
53-
}
54-
5537
return &fileFactory{
56-
file: file,
57-
runningCount: atomic.Int32{},
38+
fileName: loc,
5839
}, nil
5940
}
6041

6142
func (s *fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) {
62-
s.runningCount.Add(1)
43+
s.lock.Lock()
44+
s.runningCount++
45+
if s.runningCount == 1 {
46+
if err := s.openFile(); err != nil {
47+
s.runningCount--
48+
s.lock.Unlock()
49+
return nil, err
50+
}
51+
}
52+
s.lock.Unlock()
53+
6354
fd := &fd{
6455
prj: prg,
6556
env: env,
@@ -80,13 +71,40 @@ func (s *fileFactory) Start(_ context.Context, prg *types.Program, env []string,
8071
}
8172

8273
func (s *fileFactory) close() {
83-
if count := s.runningCount.Add(-1); count == 0 {
74+
s.lock.Lock()
75+
defer s.lock.Unlock()
76+
77+
s.runningCount--
78+
if s.runningCount == 0 {
8479
if err := s.file.Close(); err != nil {
8580
log.Errorf("error closing monitor file: %v", err)
8681
}
8782
}
8883
}
8984

85+
func (s *fileFactory) openFile() error {
86+
var (
87+
err error
88+
file *os.File
89+
)
90+
if strings.HasPrefix(s.fileName, "fd://") {
91+
fd, err := strconv.Atoi(strings.TrimPrefix(s.fileName, "fd://"))
92+
if err != nil {
93+
return err
94+
}
95+
96+
file = os.NewFile(uintptr(fd), "events")
97+
} else {
98+
file, err = os.OpenFile(s.fileName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
99+
if err != nil {
100+
return err
101+
}
102+
}
103+
104+
s.file = file
105+
return nil
106+
}
107+
90108
type fd struct {
91109
prj *types.Program
92110
env []string

0 commit comments

Comments
 (0)