Skip to content

Commit 3283832

Browse files
Anaethelionzaneli
andauthored
esutil: Modify the BulkIndexerConfig.Client type to esapi.Transport (#957) (#1000)
Co-authored-by: Shunsuke Otani <[email protected]>
1 parent 00cd4fc commit 3283832

File tree

2 files changed

+106
-83
lines changed

2 files changed

+106
-83
lines changed

esutil/bulk_indexer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type BulkIndexerConfig struct {
5858
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
5959
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
6060

61-
Client *elasticsearch.Client // The Elasticsearch client.
61+
Client esapi.Transport // The Elasticsearch client.
6262
Decoder BulkResponseJSONDecoder // A custom JSON decoder.
6363
DebugLogger BulkIndexerDebugLogger // An optional logger for debugging.
6464

esutil/bulk_indexer_internal_test.go

+105-82
Original file line numberDiff line numberDiff line change
@@ -62,104 +62,127 @@ func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
6262

6363
func TestBulkIndexer(t *testing.T) {
6464
t.Run("Basic", func(t *testing.T) {
65-
var (
66-
wg sync.WaitGroup
65+
tests := []struct {
66+
name string
67+
makeClient func(cfg elasticsearch.Config) (esapi.Transport, error)
68+
}{
69+
{
70+
name: "Client",
71+
makeClient: func(cfg elasticsearch.Config) (esapi.Transport, error) {
72+
return elasticsearch.NewClient(cfg)
73+
},
74+
},
75+
{
76+
name: "TypedClient",
77+
makeClient: func(cfg elasticsearch.Config) (esapi.Transport, error) {
78+
return elasticsearch.NewTypedClient(cfg)
79+
},
80+
},
81+
}
82+
for _, tt := range tests {
83+
tt := tt
6784

68-
countReqs int
69-
testfile string
70-
numItems = 6
71-
)
85+
t.Run(tt.name, func(t *testing.T) {
86+
var (
87+
wg sync.WaitGroup
88+
89+
countReqs int
90+
testfile string
91+
numItems = 6
92+
)
93+
94+
es, _ := tt.makeClient(elasticsearch.Config{Transport: &mockTransport{
95+
RoundTripFunc: func(*http.Request) (*http.Response, error) {
96+
countReqs++
97+
switch countReqs {
98+
case 1:
99+
testfile = "testdata/bulk_response_1a.json"
100+
case 2:
101+
testfile = "testdata/bulk_response_1b.json"
102+
case 3:
103+
testfile = "testdata/bulk_response_1c.json"
104+
}
105+
bodyContent, _ := ioutil.ReadFile(testfile)
106+
return &http.Response{
107+
Body: ioutil.NopCloser(bytes.NewBuffer(bodyContent)),
108+
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
109+
}, nil
110+
},
111+
}})
72112

73-
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{
74-
RoundTripFunc: func(*http.Request) (*http.Response, error) {
75-
countReqs++
76-
switch countReqs {
77-
case 1:
78-
testfile = "testdata/bulk_response_1a.json"
79-
case 2:
80-
testfile = "testdata/bulk_response_1b.json"
81-
case 3:
82-
testfile = "testdata/bulk_response_1c.json"
113+
cfg := BulkIndexerConfig{
114+
NumWorkers: 1,
115+
FlushBytes: 39 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata
116+
FlushInterval: time.Hour, // Disable auto-flushing, because response doesn't match number of items
117+
Client: es}
118+
if os.Getenv("DEBUG") != "" {
119+
cfg.DebugLogger = log.New(os.Stdout, "", 0)
83120
}
84-
bodyContent, _ := ioutil.ReadFile(testfile)
85-
return &http.Response{
86-
Body: ioutil.NopCloser(bytes.NewBuffer(bodyContent)),
87-
Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}},
88-
}, nil
89-
},
90-
}})
91121

92-
cfg := BulkIndexerConfig{
93-
NumWorkers: 1,
94-
FlushBytes: 39 * 2, // 38 bytes header + body, times 2 to match 2 responses per file in testdata
95-
FlushInterval: time.Hour, // Disable auto-flushing, because response doesn't match number of items
96-
Client: es}
97-
if os.Getenv("DEBUG") != "" {
98-
cfg.DebugLogger = log.New(os.Stdout, "", 0)
99-
}
100-
101-
bi, _ := NewBulkIndexer(cfg)
122+
bi, _ := NewBulkIndexer(cfg)
123+
124+
for i := 1; i <= numItems; i++ {
125+
wg.Add(1)
126+
go func(i int) {
127+
defer wg.Done()
128+
err := bi.Add(context.Background(), BulkIndexerItem{
129+
Action: "foo",
130+
DocumentID: strconv.Itoa(i),
131+
Body: strings.NewReader(fmt.Sprintf(`{"title":"foo-%d"}`, i)),
132+
})
133+
if err != nil {
134+
t.Errorf("Unexpected error: %s", err)
135+
return
136+
}
137+
}(i)
138+
}
139+
wg.Wait()
102140

103-
for i := 1; i <= numItems; i++ {
104-
wg.Add(1)
105-
go func(i int) {
106-
defer wg.Done()
107-
err := bi.Add(context.Background(), BulkIndexerItem{
108-
Action: "foo",
109-
DocumentID: strconv.Itoa(i),
110-
Body: strings.NewReader(fmt.Sprintf(`{"title":"foo-%d"}`, i)),
111-
})
112-
if err != nil {
141+
if err := bi.Close(context.Background()); err != nil {
113142
t.Errorf("Unexpected error: %s", err)
114-
return
115143
}
116-
}(i)
117-
}
118-
wg.Wait()
119144

120-
if err := bi.Close(context.Background()); err != nil {
121-
t.Errorf("Unexpected error: %s", err)
122-
}
145+
stats := bi.Stats()
123146

124-
stats := bi.Stats()
125-
126-
// added = numitems
127-
if stats.NumAdded != uint64(numItems) {
128-
t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded)
129-
}
147+
// added = numitems
148+
if stats.NumAdded != uint64(numItems) {
149+
t.Errorf("Unexpected NumAdded: want=%d, got=%d", numItems, stats.NumAdded)
150+
}
130151

131-
// flushed = numitems - 1x conflict + 1x not_found
132-
if stats.NumFlushed != uint64(numItems-2) {
133-
t.Errorf("Unexpected NumFlushed: want=%d, got=%d", numItems-2, stats.NumFlushed)
134-
}
152+
// flushed = numitems - 1x conflict + 1x not_found
153+
if stats.NumFlushed != uint64(numItems-2) {
154+
t.Errorf("Unexpected NumFlushed: want=%d, got=%d", numItems-2, stats.NumFlushed)
155+
}
135156

136-
// failed = 1x conflict + 1x not_found
137-
if stats.NumFailed != 2 {
138-
t.Errorf("Unexpected NumFailed: want=%d, got=%d", 2, stats.NumFailed)
139-
}
157+
// failed = 1x conflict + 1x not_found
158+
if stats.NumFailed != 2 {
159+
t.Errorf("Unexpected NumFailed: want=%d, got=%d", 2, stats.NumFailed)
160+
}
140161

141-
// indexed = 1x
142-
if stats.NumIndexed != 1 {
143-
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", 1, stats.NumIndexed)
144-
}
162+
// indexed = 1x
163+
if stats.NumIndexed != 1 {
164+
t.Errorf("Unexpected NumIndexed: want=%d, got=%d", 1, stats.NumIndexed)
165+
}
145166

146-
// created = 1x
147-
if stats.NumCreated != 1 {
148-
t.Errorf("Unexpected NumCreated: want=%d, got=%d", 1, stats.NumCreated)
149-
}
167+
// created = 1x
168+
if stats.NumCreated != 1 {
169+
t.Errorf("Unexpected NumCreated: want=%d, got=%d", 1, stats.NumCreated)
170+
}
150171

151-
// deleted = 1x
152-
if stats.NumDeleted != 1 {
153-
t.Errorf("Unexpected NumDeleted: want=%d, got=%d", 1, stats.NumDeleted)
154-
}
172+
// deleted = 1x
173+
if stats.NumDeleted != 1 {
174+
t.Errorf("Unexpected NumDeleted: want=%d, got=%d", 1, stats.NumDeleted)
175+
}
155176

156-
if stats.NumUpdated != 1 {
157-
t.Errorf("Unexpected NumUpdated: want=%d, got=%d", 1, stats.NumUpdated)
158-
}
177+
if stats.NumUpdated != 1 {
178+
t.Errorf("Unexpected NumUpdated: want=%d, got=%d", 1, stats.NumUpdated)
179+
}
159180

160-
// 3 items * 40 bytes, 2 workers, 1 request per worker
161-
if stats.NumRequests != 3 {
162-
t.Errorf("Unexpected NumRequests: want=%d, got=%d", 3, stats.NumRequests)
181+
// 3 items * 40 bytes, 2 workers, 1 request per worker
182+
if stats.NumRequests != 3 {
183+
t.Errorf("Unexpected NumRequests: want=%d, got=%d", 3, stats.NumRequests)
184+
}
185+
})
163186
}
164187
})
165188

0 commit comments

Comments
 (0)