[Java] Adding Dataset ORC/IPC/CSV examples (#258)
* Adding Dataset ORC/IPC examples
* Adding ORC files + Parquet gzip compresed with row group
* Apply suggestions from code review
Co-authored-by: David Li <li.davidm96@gmail.com>
* Testing against nightly version
* Adding recipes for CSV reader files
* Rollback changes on Contributing file
* Changes to test agains Java nightly packages
* Set version to 10.0.0
* Consolidate the catches
Co-authored-by: David Li <li.davidm96@gmail.com>
diff --git a/java/source/dataset.rst b/java/source/dataset.rst
index 56f868b..f7ee556 100644
--- a/java/source/dataset.rst
+++ b/java/source/dataset.rst
@@ -44,7 +44,7 @@
import java.util.stream.StreamSupport;
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -76,7 +76,7 @@
import java.util.stream.StreamSupport;
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -141,7 +141,7 @@
import org.apache.arrow.vector.types.pojo.Schema;
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 1);
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -181,27 +181,20 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
- import java.io.IOException;
-
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
- ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options)
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
) {
- scanner.scan().forEach(scanTask -> {
- try (ArrowReader reader = scanTask.execute()) {
- while (reader.loadNextBatch()) {
- try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
- System.out.print(root.contentToTSVString());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ System.out.print(root.contentToTSVString());
}
- });
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -213,6 +206,77 @@
2 Gladis
3 Juan
+Let's try to read a Parquet file with gzip compression and 3 row groups:
+
+.. code-block::
+
+ $ parquet-tools meta data4_3rg_gzip.parquet
+
+ file schema: schema
+ age: OPTIONAL INT64 R:0 D:1
+ name: OPTIONAL BINARY L:STRING R:0 D:1
+ row group 1: RC:4 TS:182 OFFSET:4
+ row group 2: RC:4 TS:190 OFFSET:420
+ row group 3: RC:3 TS:179 OFFSET:838
+
+.. testcode::
+
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ int totalBatchSize = 0;
+ int count = 1;
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ totalBatchSize += root.getRowCount();
+ System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
+ System.out.print(root.contentToTSVString());
+ }
+ }
+ System.out.println("Total batch size: " + totalBatchSize);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+.. testoutput::
+
+ Number of rows per batch[1]: 4
+ age name
+ 10 Jean
+ 10 Lu
+ 10 Kei
+ 10 Sophia
+ Number of rows per batch[2]: 4
+ age name
+ 10 Mara
+ 20 Arit
+ 20 Neil
+ 20 Jason
+ Number of rows per batch[3]: 3
+ age name
+ 20 John
+ 20 Peter
+ 20 Ismael
+ Total batch size: 11
+
Query Data Content For Directory
********************************
@@ -232,27 +296,21 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
- import java.io.IOException;
-
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/";
ScanOptions options = new ScanOptions(/*batchSize*/ 100);
- try (BufferAllocator allocator = new RootAllocator();
- DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
- Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options)
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
) {
- scanner.scan().forEach(scanTask-> {
- final int[] count = {1};
- try (ArrowReader reader = scanTask.execute()) {
- while (reader.loadNextBatch()) {
- try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
- System.out.println("Batch: " + count[0]++ + ", RowCount: " + root.getRowCount());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
+ int count = 1;
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ System.out.println("Batch: " + count++ + ", RowCount: " + root.getRowCount());
}
- });
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -264,6 +322,9 @@
Batch: 3, RowCount: 100
Batch: 4, RowCount: 100
Batch: 5, RowCount: 50
+ Batch: 6, RowCount: 4
+ Batch: 7, RowCount: 4
+ Batch: 8, RowCount: 3
Query Data Content with Projection
**********************************
@@ -284,28 +345,21 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
- import java.io.IOException;
-
String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
String[] projection = new String[] {"name"};
- ScanOptions options = new ScanOptions(/*batchSize*/ 100, Optional.of(projection));
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768, Optional.of(projection));
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
- Scanner scanner = dataset.newScan(options)
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
) {
- scanner.scan().forEach(scanTask-> {
- try (ArrowReader reader = scanTask.execute()) {
- while (reader.loadNextBatch()) {
- try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
- System.out.print(root.contentToTSVString());
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ System.out.print(root.contentToTSVString());
}
- });
+ }
} catch (Exception e) {
e.printStackTrace();
}
@@ -317,5 +371,166 @@
Gladis
Juan
+Query Arrow Files
+=================
+
+
+Query Data Content For File
+***************************
+
+Let's read an Arrow file with 3 record batches, each with 3 rows.
+
+.. testcode::
+
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ import java.io.IOException;
+
+ String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/arrowfiles/random_access.arrow";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ARROW_IPC, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ int count = 1;
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+.. testoutput::
+
+ Number of rows per batch[1]: 3
+ Number of rows per batch[2]: 3
+ Number of rows per batch[3]: 3
+
+Query ORC File
+==============
+
+Query Data Content For File
+***************************
+
+Let's read an ORC file with zlib compression 385 stripes, each with 5000 rows.
+
+.. code-block::
+
+ $ orc-metadata demo-11-zlib.orc | more
+
+ { "name": "demo-11-zlib.orc",
+ "type": "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>",
+ "stripe count": 385,
+ "compression": "zlib", "compression block": 262144,
+ "stripes": [
+ { "stripe": 0, "rows": 5000,
+ "offset": 3, "length": 1031,
+ "index": 266, "data": 636, "footer": 129
+ },
+ ...
+
+.. testcode::
+
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/orc/data1-zlib.orc";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ORC, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ int totalBatchSize = 0;
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ totalBatchSize += root.getRowCount();
+ }
+ }
+ System.out.println("Total batch size: " + totalBatchSize);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+.. testoutput::
+
+ Total batch size: 1920800
+
+Query CSV File
+==============
+
+Query Data Content For File
+***************************
+
+Let's read a CSV file.
+
+.. testcode::
+
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.VectorSchemaRoot;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+
+ String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/csv/tech_acquisitions.csv";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ int totalBatchSize = 0;
+ while (reader.loadNextBatch()) {
+ try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+ totalBatchSize += root.getRowCount();
+ System.out.print(root.contentToTSVString());
+ }
+ }
+ System.out.println("Total batch size: " + totalBatchSize);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+.. testoutput::
+
+ Acquirer Acquiree Amount in billions (USD) Date of acquisition
+ NVIDIA Mellanox 6.9 04/05/2020
+ AMD Xilinx 35.0 27/10/2020
+ Salesforce Slack 27.7 01/12/2020
+ Total batch size: 3
.. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html
\ No newline at end of file
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 7dca66f..5e18167 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -277,7 +277,7 @@
flightServer.shutdown();
System.out.println("C8: Server shut down successfully");
}
- } catch (InterruptedException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
diff --git a/java/thirdpartydeps/csv/tech_acquisitions.csv b/java/thirdpartydeps/csv/tech_acquisitions.csv
new file mode 100644
index 0000000..b34e768
--- /dev/null
+++ b/java/thirdpartydeps/csv/tech_acquisitions.csv
@@ -0,0 +1,4 @@
+Acquirer,Acquiree,Amount in billions (USD),Date of acquisition
+NVIDIA,Mellanox,6.9,04/05/2020
+AMD,Xilinx,35,27/10/2020
+Salesforce,Slack,27.7,01/12/2020
\ No newline at end of file
diff --git a/java/thirdpartydeps/orc/data1-zlib.orc b/java/thirdpartydeps/orc/data1-zlib.orc
new file mode 100644
index 0000000..db0ff15
--- /dev/null
+++ b/java/thirdpartydeps/orc/data1-zlib.orc
Binary files differ
diff --git a/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet b/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet
new file mode 100644
index 0000000..3de8f34
--- /dev/null
+++ b/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet
Binary files differ