Skip to content

Commit 28e826d

Browse files
authored
Use async reader for parsing Apache Arrow responses (#2788) (#2792)
1 parent 4806856 commit 28e826d

File tree

3 files changed

+39
-23
lines changed

3 files changed

+39
-23
lines changed

docs/helpers.asciidoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@ const result = await client.helpers
715715

716716
ES|QL can return results in multiple binary formats, including https://arrow.apache.org/[Apache Arrow]'s streaming format. Because it is a very efficient format to read, it can be valuable for performing high-performance in-memory analytics. And, because the response is streamed as batches of records, it can be used to produce aggregations and other calculations on larger-than-memory data sets.
717717

718-
`toArrowReader` returns a https://arrow.apache.org/docs/js/classes/Arrow_dom.RecordBatchReader.html[`RecordBatchStreamReader`].
718+
`toArrowReader` returns a https://github.com/apache/arrow/blob/520ae44272d491bbb52eb3c9b84864ed7088f11a/js/src/ipc/reader.ts#L216[`AsyncRecordBatchStreamReader`].
719719

720720
[source,ts]
721721
----
@@ -724,7 +724,7 @@ const reader = await client.helpers
724724
.toArrowReader()
725725
726726
// print each record as JSON
727-
for (const recordBatch of reader) {
727+
for await (const recordBatch of reader) {
728728
for (const record of recordBatch) {
729729
console.log(record.toJSON())
730730
}

src/helpers.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import assert from 'node:assert'
2525
import * as timersPromises from 'node:timers/promises'
2626
import { Readable } from 'node:stream'
2727
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
28-
import { Table, TypeMap, tableFromIPC, RecordBatchStreamReader } from 'apache-arrow/Arrow.node'
28+
import { Table, TypeMap, tableFromIPC, AsyncRecordBatchStreamReader } from 'apache-arrow/Arrow.node'
2929
import Client from './client'
3030
import * as T from './api/types'
3131
import { Id } from './api/types'
@@ -158,7 +158,7 @@ export interface EsqlResponse {
158158
export interface EsqlHelper {
159159
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
160160
toArrowTable: () => Promise<Table<TypeMap>>
161-
toArrowReader: () => Promise<RecordBatchStreamReader>
161+
toArrowReader: () => Promise<AsyncRecordBatchStreamReader>
162162
}
163163

164164
export interface EsqlToRecords<TDocument> {
@@ -1023,7 +1023,7 @@ export default class Helpers {
10231023
return tableFromIPC(response)
10241024
},
10251025

1026-
async toArrowReader (): Promise<RecordBatchStreamReader> {
1026+
async toArrowReader (): Promise<AsyncRecordBatchStreamReader> {
10271027
if (metaHeader !== null) {
10281028
reqOptions.headers = reqOptions.headers ?? {}
10291029
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
@@ -1032,8 +1032,9 @@ export default class Helpers {
10321032

10331033
params.format = 'arrow'
10341034

1035-
const response = await client.esql.query(params, reqOptions)
1036-
return RecordBatchStreamReader.from(response)
1035+
// @ts-expect-error response is a Readable when asStream is true
1036+
const response: Readable = await client.esql.query(params, reqOptions)
1037+
return await AsyncRecordBatchStreamReader.from(Readable.from(response))
10371038
}
10381039
}
10391040

test/unit/helpers/esql.test.ts

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -172,17 +172,28 @@ test('ES|QL helper', t => {
172172
t.end()
173173
})
174174

175-
test('toArrowReader', t => {
176-
t.test('Parses a binary response into an Arrow stream reader', async t => {
177-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
175+
test('toArrowReader', async t => {
176+
const testRecords = [
177+
{ amount: 4.900000095367432, },
178+
{ amount: 8.199999809265137, },
179+
{ amount: 15.5, },
180+
{ amount: 9.899999618530273, },
181+
{ amount: 13.899999618530273, },
182+
]
183+
184+
// build reusable Arrow table
185+
const table = arrow.tableFromJSON(testRecords)
186+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
178187

188+
t.test('Parses a binary response into an Arrow stream reader', async t => {
179189
const MockConnection = connection.buildMockConnection({
180190
onRequest (_params) {
181191
return {
182-
body: Buffer.from(binaryContent, 'base64'),
192+
body: Buffer.from(rawData),
183193
statusCode: 200,
184194
headers: {
185-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
195+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
196+
'transfer-encoding': 'chunked'
186197
}
187198
}
188199
}
@@ -196,26 +207,28 @@ test('ES|QL helper', t => {
196207
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
197208
t.ok(result.isStream())
198209

199-
const recordBatch = result.next().value
200-
t.same(recordBatch.get(0)?.toJSON(), {
201-
amount: 4.900000095367432,
202-
date: 1729532586965,
203-
})
210+
let count = 0
211+
for await (const recordBatch of result) {
212+
for (const record of recordBatch) {
213+
t.same(record.toJSON(), testRecords[count])
214+
count++
215+
}
216+
}
217+
204218
t.end()
205219
})
206220

207221
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
208-
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
209-
210222
const MockConnection = connection.buildMockConnection({
211223
onRequest (params) {
212224
const header = params.headers?.['x-elastic-client-meta'] ?? ''
213225
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
214226
return {
215-
body: Buffer.from(binaryContent, 'base64'),
227+
body: Buffer.from(rawData),
216228
statusCode: 200,
217229
headers: {
218-
'content-type': 'application/vnd.elasticsearch+arrow+stream'
230+
'content-type': 'application/vnd.elasticsearch+arrow+stream',
231+
'transfer-encoding': 'chunked'
219232
}
220233
}
221234
}
@@ -254,10 +267,12 @@ test('ES|QL helper', t => {
254267
new arrow.RecordBatch(schema, batch3.data),
255268
])
256269

270+
const rawData = await arrow.RecordBatchStreamWriter.writeAll(table).toUint8Array()
271+
257272
const MockConnection = connection.buildMockConnection({
258273
onRequest (_params) {
259274
return {
260-
body: Buffer.from(arrow.tableToIPC(table, "stream")),
275+
body: Buffer.from(rawData),
261276
statusCode: 200,
262277
headers: {
263278
'content-type': 'application/vnd.elasticsearch+arrow+stream'
@@ -275,7 +290,7 @@ test('ES|QL helper', t => {
275290
t.ok(result.isStream())
276291

277292
let counter = 0
278-
for (const batch of result) {
293+
for await (const batch of result) {
279294
for (const row of batch) {
280295
counter++
281296
const { id, val } = row.toJSON()

0 commit comments

Comments
 (0)