[Java] Adding Dataset ORC/IPC/CSV examples (#258)

* Adding Dataset ORC/IPC examples

* Adding ORC files + Parquet gzip compresed with row group

* Apply suggestions from code review

Co-authored-by: David Li <li.davidm96@gmail.com>

* Testing against nightly version

* Adding recipes for CSV reader files

* Rollback changes on Contributing file

* Changes to test agains Java nightly packages

* Set version to 10.0.0

* Consolidate the catches

Co-authored-by: David Li <li.davidm96@gmail.com>
diff --git a/java/source/dataset.rst b/java/source/dataset.rst
index 56f868b..f7ee556 100644
--- a/java/source/dataset.rst
+++ b/java/source/dataset.rst
@@ -44,7 +44,7 @@
    import java.util.stream.StreamSupport;
 
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
-   ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -76,7 +76,7 @@
    import java.util.stream.StreamSupport;
 
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
-   ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -141,7 +141,7 @@
    import org.apache.arrow.vector.types.pojo.Schema;
 
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
-   ScanOptions options = new ScanOptions(/*batchSize*/ 1);
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
@@ -181,27 +181,20 @@
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.ipc.ArrowReader;
 
-   import java.io.IOException;
-
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
-   ScanOptions options = new ScanOptions(/*batchSize*/ 100);
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
        Dataset dataset = datasetFactory.finish();
-       Scanner scanner = dataset.newScan(options)
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
    ) {
-       scanner.scan().forEach(scanTask -> {
-           try (ArrowReader reader = scanTask.execute()) {
-               while (reader.loadNextBatch()) {
-                   try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
-                       System.out.print(root.contentToTSVString());
-                   }
-               }
-           } catch (IOException e) {
-               e.printStackTrace();
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               System.out.print(root.contentToTSVString());
            }
-       });
+       }
    } catch (Exception e) {
        e.printStackTrace();
    }
@@ -213,6 +206,77 @@
    2    Gladis
    3    Juan
 
+Let's try to read a Parquet file with gzip compression and 3 row groups:
+
+.. code-block::
+
+   $ parquet-tools meta data4_3rg_gzip.parquet
+
+   file schema: schema
+   age:         OPTIONAL INT64 R:0 D:1
+   name:        OPTIONAL BINARY L:STRING R:0 D:1
+   row group 1: RC:4 TS:182 OFFSET:4
+   row group 2: RC:4 TS:190 OFFSET:420
+   row group 3: RC:3 TS:179 OFFSET:838
+
+.. testcode::
+
+   import org.apache.arrow.dataset.file.FileFormat;
+   import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+   import org.apache.arrow.dataset.jni.NativeMemoryPool;
+   import org.apache.arrow.dataset.scanner.ScanOptions;
+   import org.apache.arrow.dataset.scanner.Scanner;
+   import org.apache.arrow.dataset.source.Dataset;
+   import org.apache.arrow.dataset.source.DatasetFactory;
+   import org.apache.arrow.memory.BufferAllocator;
+   import org.apache.arrow.memory.RootAllocator;
+   import org.apache.arrow.vector.VectorSchemaRoot;
+   import org.apache.arrow.vector.ipc.ArrowReader;
+
+   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet";
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+   try (
+       BufferAllocator allocator = new RootAllocator();
+       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
+       Dataset dataset = datasetFactory.finish();
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
+   ) {
+       int totalBatchSize = 0;
+       int count = 1;
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               totalBatchSize += root.getRowCount();
+               System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
+               System.out.print(root.contentToTSVString());
+           }
+       }
+       System.out.println("Total batch size: " + totalBatchSize);
+   } catch (Exception e) {
+       e.printStackTrace();
+   }
+
+.. testoutput::
+
+   Number of rows per batch[1]: 4
+   age    name
+   10    Jean
+   10    Lu
+   10    Kei
+   10    Sophia
+   Number of rows per batch[2]: 4
+   age    name
+   10    Mara
+   20    Arit
+   20    Neil
+   20    Jason
+   Number of rows per batch[3]: 3
+   age    name
+   20    John
+   20    Peter
+   20    Ismael
+   Total batch size: 11
+
 Query Data Content For Directory
 ********************************
 
@@ -232,27 +296,21 @@
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.ipc.ArrowReader;
 
-   import java.io.IOException;
-
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/";
    ScanOptions options = new ScanOptions(/*batchSize*/ 100);
-   try (BufferAllocator allocator = new RootAllocator();
-        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
-        Dataset dataset = datasetFactory.finish();
-        Scanner scanner = dataset.newScan(options)
+   try (
+       BufferAllocator allocator = new RootAllocator();
+       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
+       Dataset dataset = datasetFactory.finish();
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
    ) {
-       scanner.scan().forEach(scanTask-> {
-           final int[] count = {1};
-           try (ArrowReader reader = scanTask.execute()) {
-               while (reader.loadNextBatch()) {
-                   try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
-                       System.out.println("Batch: " + count[0]++ + ", RowCount: " + root.getRowCount());
-                   }
-               }
-           } catch (IOException e) {
-               e.printStackTrace();
+       int count = 1;
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               System.out.println("Batch: " + count++ + ", RowCount: " + root.getRowCount());
            }
-       });
+       }
    } catch (Exception e) {
        e.printStackTrace();
    }
@@ -264,6 +322,9 @@
    Batch: 3, RowCount: 100
    Batch: 4, RowCount: 100
    Batch: 5, RowCount: 50
+   Batch: 6, RowCount: 4
+   Batch: 7, RowCount: 4
+   Batch: 8, RowCount: 3
 
 Query Data Content with Projection
 **********************************
@@ -284,28 +345,21 @@
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.ipc.ArrowReader;
 
-   import java.io.IOException;
-
    String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet";
    String[] projection = new String[] {"name"};
-   ScanOptions options = new ScanOptions(/*batchSize*/ 100, Optional.of(projection));
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768, Optional.of(projection));
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
        Dataset dataset = datasetFactory.finish();
-       Scanner scanner = dataset.newScan(options)
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
    ) {
-       scanner.scan().forEach(scanTask-> {
-           try (ArrowReader reader = scanTask.execute()) {
-               while (reader.loadNextBatch()) {
-                   try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
-                       System.out.print(root.contentToTSVString());
-                   }
-               }
-           } catch (IOException e) {
-               e.printStackTrace();
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               System.out.print(root.contentToTSVString());
            }
-       });
+       }
    } catch (Exception e) {
        e.printStackTrace();
    }
@@ -317,5 +371,166 @@
    Gladis
    Juan
 
+Query Arrow Files
+=================
+
+
+Query Data Content For File
+***************************
+
+Let's read an Arrow file with 3 record batches, each with 3 rows.
+
+.. testcode::
+
+   import org.apache.arrow.dataset.file.FileFormat;
+   import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+   import org.apache.arrow.dataset.jni.NativeMemoryPool;
+   import org.apache.arrow.dataset.scanner.ScanOptions;
+   import org.apache.arrow.dataset.scanner.Scanner;
+   import org.apache.arrow.dataset.source.Dataset;
+   import org.apache.arrow.dataset.source.DatasetFactory;
+   import org.apache.arrow.memory.BufferAllocator;
+   import org.apache.arrow.memory.RootAllocator;
+   import org.apache.arrow.vector.VectorSchemaRoot;
+   import org.apache.arrow.vector.ipc.ArrowReader;
+
+   import java.io.IOException;
+
+   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/arrowfiles/random_access.arrow";
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+   try (
+       BufferAllocator allocator = new RootAllocator();
+       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ARROW_IPC, uri);
+       Dataset dataset = datasetFactory.finish();
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
+   ) {
+       int count = 1;
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               System.out.println("Number of rows per batch["+ count++ +"]: " + root.getRowCount());
+           }
+       }
+   } catch (Exception e) {
+       e.printStackTrace();
+   }
+
+.. testoutput::
+
+   Number of rows per batch[1]: 3
+   Number of rows per batch[2]: 3
+   Number of rows per batch[3]: 3
+
+Query ORC File
+==============
+
+Query Data Content For File
+***************************
+
+Let's read an ORC file with zlib compression 385 stripes, each with 5000 rows.
+
+.. code-block::
+
+   $ orc-metadata demo-11-zlib.orc | more
+
+   { "name": "demo-11-zlib.orc",
+     "type": "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>",
+     "stripe count": 385,
+     "compression": "zlib", "compression block": 262144,
+     "stripes": [
+       { "stripe": 0, "rows": 5000,
+         "offset": 3, "length": 1031,
+         "index": 266, "data": 636, "footer": 129
+       },
+   ...
+
+.. testcode::
+
+   import org.apache.arrow.dataset.file.FileFormat;
+   import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+   import org.apache.arrow.dataset.jni.NativeMemoryPool;
+   import org.apache.arrow.dataset.scanner.ScanOptions;
+   import org.apache.arrow.dataset.scanner.Scanner;
+   import org.apache.arrow.dataset.source.Dataset;
+   import org.apache.arrow.dataset.source.DatasetFactory;
+   import org.apache.arrow.memory.BufferAllocator;
+   import org.apache.arrow.memory.RootAllocator;
+   import org.apache.arrow.vector.VectorSchemaRoot;
+   import org.apache.arrow.vector.ipc.ArrowReader;
+
+   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/orc/data1-zlib.orc";
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+   try (
+       BufferAllocator allocator = new RootAllocator();
+       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.ORC, uri);
+       Dataset dataset = datasetFactory.finish();
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
+   ) {
+       int totalBatchSize = 0;
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               totalBatchSize += root.getRowCount();
+           }
+       }
+       System.out.println("Total batch size: " + totalBatchSize);
+   } catch (Exception e) {
+       e.printStackTrace();
+   }
+
+.. testoutput::
+
+   Total batch size: 1920800
+
+Query CSV File
+==============
+
+Query Data Content For File
+***************************
+
+Let's read a CSV file.
+
+.. testcode::
+
+   import org.apache.arrow.dataset.file.FileFormat;
+   import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+   import org.apache.arrow.dataset.jni.NativeMemoryPool;
+   import org.apache.arrow.dataset.scanner.ScanOptions;
+   import org.apache.arrow.dataset.scanner.Scanner;
+   import org.apache.arrow.dataset.source.Dataset;
+   import org.apache.arrow.dataset.source.DatasetFactory;
+   import org.apache.arrow.memory.BufferAllocator;
+   import org.apache.arrow.memory.RootAllocator;
+   import org.apache.arrow.vector.VectorSchemaRoot;
+   import org.apache.arrow.vector.ipc.ArrowReader;
+
+   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/csv/tech_acquisitions.csv";
+   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+   try (
+       BufferAllocator allocator = new RootAllocator();
+       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, uri);
+       Dataset dataset = datasetFactory.finish();
+       Scanner scanner = dataset.newScan(options);
+       ArrowReader reader = scanner.scanBatches()
+   ) {
+       int totalBatchSize = 0;
+       while (reader.loadNextBatch()) {
+           try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
+               totalBatchSize += root.getRowCount();
+               System.out.print(root.contentToTSVString());
+           }
+       }
+       System.out.println("Total batch size: " + totalBatchSize);
+   } catch (Exception e) {
+       e.printStackTrace();
+   }
+
+.. testoutput::
+
+   Acquirer    Acquiree    Amount in billions (USD)    Date of acquisition
+   NVIDIA    Mellanox    6.9    04/05/2020
+   AMD    Xilinx    35.0    27/10/2020
+   Salesforce    Slack    27.7    01/12/2020
+   Total batch size: 3
 
 .. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html
\ No newline at end of file
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 7dca66f..5e18167 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -277,7 +277,7 @@
                flightServer.shutdown();
                System.out.println("C8: Server shut down successfully");
            }
-       } catch (InterruptedException e) {
+       } catch (Exception e) {
            e.printStackTrace();
        }
    }
diff --git a/java/thirdpartydeps/csv/tech_acquisitions.csv b/java/thirdpartydeps/csv/tech_acquisitions.csv
new file mode 100644
index 0000000..b34e768
--- /dev/null
+++ b/java/thirdpartydeps/csv/tech_acquisitions.csv
@@ -0,0 +1,4 @@
+Acquirer,Acquiree,Amount in billions (USD),Date of acquisition

+NVIDIA,Mellanox,6.9,04/05/2020

+AMD,Xilinx,35,27/10/2020

+Salesforce,Slack,27.7,01/12/2020
\ No newline at end of file
diff --git a/java/thirdpartydeps/orc/data1-zlib.orc b/java/thirdpartydeps/orc/data1-zlib.orc
new file mode 100644
index 0000000..db0ff15
--- /dev/null
+++ b/java/thirdpartydeps/orc/data1-zlib.orc
Binary files differ
diff --git a/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet b/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet
new file mode 100644
index 0000000..3de8f34
--- /dev/null
+++ b/java/thirdpartydeps/parquetfiles/data4_3rg_gzip.parquet
Binary files differ