Skip to content

Commit 71fbda3

Browse files
committed
Apache Arrow for DuckDBClient
1 parent 990c240 commit 71fbda3

File tree

5 files changed

+67
-27
lines changed

5 files changed

+67
-27
lines changed

src/arrow.mjs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Returns true if the vaue is an Apache Arrow table. This uses a “duck” test
2+
// (instead of strict instanceof) because we want it to work with a range of
3+
// Apache Arrow versions at least 7.0.0 or above.
4+
// https://arrow.apache.org/docs/7.0/js/classes/Arrow_dom.Table.html
5+
export function isArrowTable(value) {
6+
return (
7+
value &&
8+
typeof value.getChild === "function" &&
9+
typeof value.toArray === "function" &&
10+
value.schema &&
11+
Array.isArray(value.schema.fields)
12+
);
13+
}
14+
115
export function getArrowTableSchema(table) {
216
return table.schema.fields.map(getArrowFieldSchema);
317
}

src/duckdb.mjs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import {getArrowTableSchema} from "./arrow.mjs";
1+
import {getArrowTableSchema, isArrowTable} from "./arrow.mjs";
22
import {arrow9 as arrow, duckdb} from "./dependencies.mjs";
33
import {FileAttachment} from "./fileAttachment.mjs";
4+
import {cdn} from "./require.mjs";
45

56
// Adapted from https://observablehq.com/@cmudig/duckdb-client
67
// Copyright 2021 CMU Data Interaction Group
@@ -31,9 +32,6 @@ import {FileAttachment} from "./fileAttachment.mjs";
3132
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
3233
// POSSIBILITY OF SUCH DAMAGE.
3334

34-
// TODO Allow this to be overridden using the Library’s resolver.
35-
const cdn = "https://cdn.observableusercontent.com/npm/";
36-
3735
export class DuckDBClient {
3836
constructor(db) {
3937
Object.defineProperties(this, {
@@ -125,16 +123,22 @@ export class DuckDBClient {
125123
await db.open(config);
126124
await Promise.all(
127125
Object.entries(sources).map(async ([name, source]) => {
128-
if ("array" in source) { // array + options
129-
const {array, ...options} = source;
130-
await insertArray(db, name, array, options);
126+
if (source instanceof FileAttachment) { // bare file
127+
await insertFile(db, name, source);
128+
} else if (isArrowTable(source)) { // bare arrow table
129+
await insertArrowTable(db, name, source);
130+
} else if (Array.isArray(source)) { // bare array of objects
131+
await insertArray(db, name, source);
132+
} else if ("data" in source) { // data + options
133+
const {data, ...options} = source;
134+
if (isArrowTable(data)) {
135+
await insertArrowTable(db, name, data, options);
136+
} else {
137+
await insertArray(db, name, data, options);
138+
}
131139
} else if ("file" in source) { // file + options
132140
const {file, ...options} = source;
133141
await insertFile(db, name, file, options);
134-
} else if (source instanceof FileAttachment) { // bare file
135-
await insertFile(db, name, source);
136-
} else if (Array.isArray(source)) { // bare data
137-
await insertArray(db, name, source);
138142
} else {
139143
throw new Error(`invalid source: ${source}`);
140144
}
@@ -156,36 +160,40 @@ async function insertFile(database, name, file, options) {
156160
try {
157161
switch (file.mimeType) {
158162
case "text/csv":
159-
await connection.insertCSVFromPath(file.name, {
163+
return await connection.insertCSVFromPath(file.name, {
160164
name,
161165
schema: "main",
162166
...options
163167
});
164-
break;
165168
case "application/json":
166-
await connection.insertJSONFromPath(file.name, {
169+
return await connection.insertJSONFromPath(file.name, {
167170
name,
168171
schema: "main",
169172
...options
170173
});
171-
break;
172174
default:
173-
if (file.name.endsWith(".parquet")) {
174-
await connection.query(
175+
if (/\.arrow$/i.test(file.name)) {
176+
const buffer = new Uint8Array(await file.arrayBuffer());
177+
return await connection.insertArrowFromIPCStream(buffer, {
178+
name,
179+
schema: "main",
180+
...options
181+
});
182+
}
183+
if (/\.parquet$/i.test(file.name)) {
184+
return await connection.query(
175185
`CREATE VIEW '${name}' AS SELECT * FROM parquet_scan('${file.name}')`
176186
);
177-
} else {
178-
throw new Error(`unknown file type: ${file.mimeType}`);
179187
}
188+
throw new Error(`unknown file type: ${file.mimeType}`);
180189
}
181190
} finally {
182191
await connection.close();
183192
}
184193
}
185194

186-
async function insertArray(database, name, array, options) {
195+
async function insertArrowTable(database, name, table, options) {
187196
const arrow = await loadArrow();
188-
const table = arrow.tableFromJSON(array);
189197
const buffer = arrow.tableToIPC(table);
190198
const connection = await database.connect();
191199
try {
@@ -199,6 +207,12 @@ async function insertArray(database, name, array, options) {
199207
}
200208
}
201209

210+
async function insertArray(database, name, array, options) {
211+
const arrow = await loadArrow();
212+
const table = arrow.tableFromJSON(array);
213+
return await insertArrowTable(database, name, table, options);
214+
}
215+
202216
async function createDuckDB() {
203217
const duck = await import(`${cdn}${duckdb.resolve()}`);
204218
const bundle = await duck.selectBundle({

src/fileAttachment.mjs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {autoType, csvParse, csvParseRows, tsvParse, tsvParseRows} from "d3-dsv";
2-
import {arrow4, jszip, exceljs} from "./dependencies.mjs";
3-
import {requireDefault} from "./require.mjs";
2+
import {arrow4, arrow9, jszip, exceljs, arrow10} from "./dependencies.mjs";
3+
import {cdn, requireDefault} from "./require.mjs";
44
import {SQLiteDatabaseClient} from "./sqlite.mjs";
55
import {Workbook} from "./xlsx.mjs";
66

@@ -56,9 +56,18 @@ export class AbstractFile {
5656
i.src = url;
5757
});
5858
}
59-
async arrow() {
60-
const [Arrow, response] = await Promise.all([requireDefault(arrow4.resolve()), remote_fetch(this)]); // TODO upgrade to apache-arrow@9
61-
return Arrow.Table.from(response);
59+
async arrow({version = 4} = {}) {
60+
switch (version) {
61+
case 4: {
62+
const [Arrow, response] = await Promise.all([requireDefault(arrow4.resolve()), remote_fetch(this)]);
63+
return Arrow.Table.from(response);
64+
}
65+
case 9: {
66+
const [Arrow, response] = await Promise.all([import(`${cdn}${arrow9.resolve()}`), remote_fetch(this)]);
67+
return Arrow.tableFromIPC(response);
68+
}
69+
default: throw new Error(`unsupported arrow version: ${version}`);
70+
}
6271
}
6372
async sqlite() {
6473
return SQLiteDatabaseClient.open(remote_fetch(this));

src/index.mjs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
export {default as FileAttachments, AbstractFile} from "./fileAttachment.mjs";
22
export {default as Library} from "./library.mjs";
3-
export {makeQueryTemplate, loadDataSource, arrayIsPrimitive, isDataArray, isDatabaseClient} from "./table.mjs";
3+
export {makeQueryTemplate, loadDataSource, arrayIsPrimitive, isArrowTable, isDataArray, isDatabaseClient} from "./table.mjs";

src/require.mjs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import {require as initialRequire, requireFrom} from "d3-require";
22

3+
// TODO Allow this to be overridden using the Library’s resolver.
4+
export const cdn = "https://cdn.observableusercontent.com/npm/";
5+
36
export let requireDefault = initialRequire;
47

58
export function setDefaultRequire(require) {

0 commit comments

Comments
 (0)