| #! /usr/bin/env node |
| |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| import * as fs from 'fs'; |
| import * as stream from 'stream'; |
| import { valueToString } from '../util/pretty'; |
| import { Schema, RecordBatch, RecordBatchReader, AsyncByteQueue } from '../Arrow.node'; |
| |
| /* eslint-disable @typescript-eslint/no-require-imports */ |
| |
| const padLeft = require('pad-left'); |
| const bignumJSONParse = require('json-bignum').parse; |
| const argv = require(`command-line-args`)(cliOpts(), { partial: true }); |
| const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean); |
| |
| const state = { ...argv, closed: false, maxColWidths: [10] }; |
| |
| type ToStringState = { |
| hr: string; |
| sep: string; |
| schema: any; |
| closed: boolean; |
| metadata: boolean; |
| maxColWidths: number[]; |
| }; |
| |
| (async () => { |
| |
| const sources = argv.help ? [] : [ |
| ...files.map((file) => () => fs.createReadStream(file)), |
| ...(process.stdin.isTTY ? [] : [() => process.stdin]) |
| ].filter(Boolean) as (() => NodeJS.ReadableStream)[]; |
| |
| let reader: RecordBatchReader | null; |
| let hasReaders = false; |
| |
| for (const source of sources) { |
| if (state.closed) { break; } |
| for await (reader of recordBatchReaders(source)) { |
| hasReaders = true; |
| const transformToString = batchesToString(state, reader.schema); |
| await pipeTo( |
| reader.pipe(transformToString), |
| process.stdout, { end: false } |
| ).catch(() => state.closed = true); // Handle EPIPE errors |
| } |
| if (state.closed) { break; } |
| } |
| |
| return hasReaders ? 0 : print_usage(); |
| })() |
| .then((x) => +x || 0, (err) => { |
| if (err) { |
| console.error(`${err && err.stack || err}`); |
| } |
| return process.exitCode || 1; |
| }).then((code) => process.exit(code)); |
| |
| function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) { |
| return new Promise((resolve, reject) => { |
| |
| source.on('end', onEnd).pipe(sink, opts).on('error', onErr); |
| |
| function onEnd() { done(undefined, resolve); } |
| function onErr(err: any) { done(err, reject); } |
| function done(e: any, cb: (e?: any) => void) { |
| source.removeListener('end', onEnd); |
| sink.removeListener('error', onErr); |
| cb(e); |
| } |
| }); |
| } |
| |
| async function *recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream) { |
| |
| const json = new AsyncByteQueue(); |
| const stream = new AsyncByteQueue(); |
| const source = createSourceStream(); |
| let reader: RecordBatchReader | null = null; |
| let readers: AsyncIterable<RecordBatchReader> | null = null; |
| // tee the input source, just in case it's JSON |
| source.on('end', () => [stream, json].forEach((y) => y.close())) |
| .on('data', (x) => [stream, json].forEach((y) => y.write(x))) |
| .on('error', (e) => [stream, json].forEach((y) => y.abort(e))); |
| |
| try { |
| for await (reader of RecordBatchReader.readAll(stream)) { |
| reader && (yield reader); |
| } |
| if (reader) return; |
| } catch (e) { readers = null; } |
| |
| if (!readers) { |
| await json.closed; |
| if (source instanceof fs.ReadStream) { source.close(); } |
| // If the data in the `json` ByteQueue parses to JSON, then assume it's Arrow JSON from a file or stdin |
| try { |
| for await (reader of RecordBatchReader.readAll(bignumJSONParse(await json.toString()))) { |
| reader && (yield reader); |
| } |
| } catch (e) { readers = null; } |
| } |
| } |
| |
| function batchesToString(state: ToStringState, schema: Schema) { |
| |
| let rowId = 0; |
| let batchId = -1; |
| let maxColWidths = [10]; |
| const { hr, sep } = state; |
| |
| const header = ['row_id', ...schema.fields.map((f) => `${f}`)].map(valueToString); |
| |
| state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)); |
| |
| return new stream.Transform({ |
| encoding: 'utf8', |
| writableObjectMode: true, |
| readableObjectMode: false, |
| final(cb: (error?: Error | null) => void) { |
| // if there were no batches, then print the Schema, and metadata |
| if (batchId === -1) { |
| hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); |
| this.push(`${formatRow(header, maxColWidths, sep)}\n`); |
| if (state.metadata && schema.metadata.size > 0) { |
| this.push(`metadata:\n${formatMetadata(schema.metadata)}\n`); |
| } |
| } |
| hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); |
| cb(); |
| }, |
| transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) { |
| |
| batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema); |
| |
| if (state.closed) { return cb(undefined, null); } |
| |
| // Pass one to convert to strings and count max column widths |
| state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length))); |
| |
| // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and |
| if (++batchId === 0) { |
| hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); |
| if (state.metadata && batch.schema.metadata.size > 0) { |
| this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`); |
| hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); |
| } |
| if (batch.length <= 0 || batch.numCols <= 0) { |
| this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`); |
| } |
| } |
| |
| if (batch.length > 0 && batch.numCols > 0) { |
| // If any of the column widths changed, print the header again |
| if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) { |
| this.push(`${formatRow(header, state.maxColWidths, sep)}\n`); |
| } |
| maxColWidths = state.maxColWidths; |
| for (const row of batch) { |
| if (state.closed) { break; } else if (!row) { continue; } |
| if (rowId++ % 350 === 0) { |
| this.push(`${formatRow(header, maxColWidths, sep)}\n`); |
| } |
| this.push(`${formatRow([rowId, ...row.toArray()].map(valueToString), maxColWidths, sep)}\n`); |
| } |
| } |
| cb(); |
| } |
| }); |
| } |
| |
| function horizontalRule(maxColWidths: number[], hr = '', sep = ' | ') { |
| return ` ${padLeft('', maxColWidths.reduce((x, y) => x + y, -2 + maxColWidths.length * sep.length), hr)}`; |
| } |
| |
| function formatRow(row: string[] = [], maxColWidths: number[] = [], sep = ' | ') { |
| return `${row.map((x, j) => padLeft(x, maxColWidths[j])).join(sep)}`; |
| } |
| |
| function formatMetadata(metadata: Map<string, string>) { |
| |
| return [...metadata].map(([key, val]) => |
| ` ${key}: ${formatMetadataValue(val)}` |
| ).join(', \n'); |
| |
| function formatMetadataValue(value = '') { |
| let parsed = value; |
| try { |
| parsed = JSON.stringify(JSON.parse(value), null, 2); |
| } catch (e) { parsed = value; } |
| return valueToString(parsed).split('\n').join('\n '); |
| } |
| } |
| |
| function measureColumnWidths(rowId: number, batch: RecordBatch, maxColWidths: number[] = []) { |
| let val: any, j = 0; |
| for (const row of batch) { |
| if (!row) { continue; } |
| maxColWidths[j = 0] = Math.max(maxColWidths[0] || 0, (`${rowId++}`).length); |
| for (val of row) { |
| if (val && typedArrayElementWidths.has(val.constructor) && (typeof val[Symbol.toPrimitive] !== 'function')) { |
| // If we're printing a column of TypedArrays, ensure the column is wide enough to accommodate |
| // the widest possible element for a given byte size, since JS omits leading zeroes. For example: |
| // 1 | [1137743649,2170567488,244696391,2122556476] |
| // 2 | null |
| // 3 | [637174007,2142281880,961736230,2912449282] |
| // 4 | [1035112265,21832886,412842672,2207710517] |
| // 5 | null |
| // 6 | null |
| // 7 | [2755142991,4192423256,2994359,467878370] |
| const elementWidth = typedArrayElementWidths.get(val.constructor)!; |
| |
| maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, |
| 2 + // brackets on each end |
| (val.length - 1) + // commas between elements |
| (val.length * elementWidth) // width of stringified 2^N-1 |
| ); |
| } else { |
| maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, valueToString(val).length); |
| } |
| ++j; |
| } |
| } |
| return maxColWidths; |
| } |
| |
| // Measure the stringified representation of 2^N-1 for each TypedArray variant |
| const typedArrayElementWidths = (() => { |
| const maxElementWidth = (ArrayType: any) => { |
| const octets = Array.from({ length: ArrayType.BYTES_PER_ELEMENT - 1 }, _ => 255); |
| return `${new ArrayType(new Uint8Array([...octets, 254]).buffer)[0]}`.length; |
| }; |
| return new Map<any, number>([ |
| [Int8Array, maxElementWidth(Int8Array)], |
| [Int16Array, maxElementWidth(Int16Array)], |
| [Int32Array, maxElementWidth(Int32Array)], |
| [Uint8Array, maxElementWidth(Uint8Array)], |
| [Uint16Array, maxElementWidth(Uint16Array)], |
| [Uint32Array, maxElementWidth(Uint32Array)], |
| [Float32Array, maxElementWidth(Float32Array)], |
| [Float64Array, maxElementWidth(Float64Array)], |
| [Uint8ClampedArray, maxElementWidth(Uint8ClampedArray)] |
| ]); |
| })(); |
| |
| function cliOpts() { |
| return [ |
| { |
| type: String, |
| name: 'schema', alias: 's', |
| optional: true, multiple: true, |
| typeLabel: '{underline columns}', |
| description: 'A space-delimited list of column names' |
| }, |
| { |
| type: String, |
| name: 'file', alias: 'f', |
| optional: true, multiple: true, |
| description: 'The Arrow file to read' |
| }, |
| { |
| type: String, |
| name: 'sep', optional: true, default: ' | ', |
| description: 'The column separator character (default: " | ")' |
| }, |
| { |
| type: String, |
| name: 'hr', optional: true, default: '', |
| description: 'The horizontal border character (default: "")' |
| }, |
| { |
| type: Boolean, |
| name: 'metadata', alias: 'm', |
| optional: true, default: false, |
| description: 'Flag to print Schema metadata (default: false)' |
| }, |
| { |
| type: Boolean, |
| name: 'help', optional: true, default: false, |
| description: 'Print this usage guide.' |
| } |
| ]; |
| } |
| |
| function print_usage() { |
| console.log(require('command-line-usage')([ |
| { |
| header: 'arrow2csv', |
| content: 'Print a CSV from an Arrow file' |
| }, |
| { |
| header: 'Synopsis', |
| content: [ |
| '$ arrow2csv {underline file.arrow} [{bold --schema} column_name ...]', |
| '$ arrow2csv [{bold --schema} column_name ...] [{bold --file} {underline file.arrow}]', |
| '$ arrow2csv {bold -s} column_1 {bold -s} column_2 [{bold -f} {underline file.arrow}]', |
| '$ arrow2csv [{bold --help}]' |
| ] |
| }, |
| { |
| header: 'Options', |
| optionList: cliOpts() |
| }, |
| { |
| header: 'Example', |
| content: [ |
| '$ arrow2csv --schema foo baz --sep " , " -f simple.arrow', |
| '> "row_id", "foo: Int32", "baz: Utf8"', |
| '> 0, 1, "aa"', |
| '> 1, null, null', |
| '> 2, 3, null', |
| '> 3, 4, "bbb"', |
| '> 4, 5, "cccc"', |
| ] |
| } |
| ])); |
| return 1; |
| } |