| .. Licensed to the Apache Software Foundation (ASF) under one |
| .. or more contributor license agreements. See the NOTICE file |
| .. distributed with this work for additional information |
| .. regarding copyright ownership. The ASF licenses this file |
| .. to you under the Apache License, Version 2.0 (the |
| .. "License"); you may not use this file except in compliance |
| .. with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| .. software distributed under the License is distributed on an |
| .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| .. KIND, either express or implied. See the License for the |
| .. specific language governing permissions and limitations |
| .. under the License. |
| |
| .. _arrow-dataset: |
| |
| ======= |
| Dataset |
| ======= |
| |
| * `Arrow Java Dataset`_: Java implementation of Arrow Datasets library. Implement Dataset Java API by JNI to C++. |
| |
| .. contents:: |
| |
| Constructing Datasets |
| ===================== |
| |
| We can construct a dataset with an auto-inferred schema. |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import java.util.stream.StreamSupport; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 100); |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| System.out.println(StreamSupport.stream(scanner.scan().spliterator(), false).count()); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| 1 |
| |
| Let construct our dataset with predefined schema. |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import java.util.stream.StreamSupport; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 100); |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(datasetFactory.inspect()); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| System.out.println(StreamSupport.stream(scanner.scan().spliterator(), false).count()); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| 1 |
| |
| Getting the Schema |
| ================== |
| |
| During Dataset Construction |
| *************************** |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.types.pojo.Schema; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri) |
| ) { |
| Schema schema = datasetFactory.inspect(); |
| |
| System.out.println(schema); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| Schema<id: Int(32, true), name: Utf8>(metadata: {parquet.avro.schema={"type":"record","name":"User","namespace":"org.apache.arrow.dataset","fields":[{"name":"id","type":["int","null"]},{"name":"name","type":["string","null"]}]}, writer.model.name=avro}) |
| |
| From a Dataset |
| ************** |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.types.pojo.Schema; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 1); |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| Schema schema = scanner.schema(); |
| |
| System.out.println(schema); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| Schema<id: Int(32, true), name: Utf8>(metadata: {parquet.avro.schema={"type":"record","name":"User","namespace":"org.apache.arrow.dataset","fields":[{"name":"id","type":["int","null"]},{"name":"name","type":["string","null"]}]}, writer.model.name=avro}) |
| |
| Query Parquet File |
| ================== |
| |
| Let query information for a parquet file. |
| |
| Query Data Content For File |
| *************************** |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.ipc.ArrowReader; |
| |
| import java.io.IOException; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 100); |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| scanner.scan().forEach(scanTask -> { |
| try (ArrowReader reader = scanTask.execute()) { |
| while (reader.loadNextBatch()) { |
| try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) { |
| System.out.print(root.contentToTSVString()); |
| } |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| }); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| id name |
| 1 David |
| 2 Gladis |
| 3 Juan |
| |
| Query Data Content For Directory |
| ******************************** |
| |
| Consider that we have these files: data1: 3 rows, data2: 3 rows and data3: 250 rows. |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.ipc.ArrowReader; |
| |
| import java.io.IOException; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/"; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 100); |
| try (BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| scanner.scan().forEach(scanTask-> { |
| final int[] count = {1}; |
| try (ArrowReader reader = scanTask.execute()) { |
| while (reader.loadNextBatch()) { |
| try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) { |
| System.out.println("Batch: " + count[0]++ + ", RowCount: " + root.getRowCount()); |
| } |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| }); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| Batch: 1, RowCount: 3 |
| Batch: 2, RowCount: 3 |
| Batch: 3, RowCount: 100 |
| Batch: 4, RowCount: 100 |
| Batch: 5, RowCount: 50 |
| |
| Query Data Content with Projection |
| ********************************** |
| |
| In case we need to project only certain columns we could configure ScanOptions with projections needed. |
| |
| .. testcode:: |
| |
| import org.apache.arrow.dataset.file.FileFormat; |
| import org.apache.arrow.dataset.file.FileSystemDatasetFactory; |
| import org.apache.arrow.dataset.jni.NativeMemoryPool; |
| import org.apache.arrow.dataset.scanner.ScanOptions; |
| import org.apache.arrow.dataset.scanner.Scanner; |
| import org.apache.arrow.dataset.source.Dataset; |
| import org.apache.arrow.dataset.source.DatasetFactory; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.ipc.ArrowReader; |
| |
| import java.io.IOException; |
| |
| String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; |
| String[] projection = new String[] {"name"}; |
| ScanOptions options = new ScanOptions(/*batchSize*/ 100, Optional.of(projection)); |
| try ( |
| BufferAllocator allocator = new RootAllocator(); |
| DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); |
| Dataset dataset = datasetFactory.finish(); |
| Scanner scanner = dataset.newScan(options) |
| ) { |
| scanner.scan().forEach(scanTask-> { |
| try (ArrowReader reader = scanTask.execute()) { |
| while (reader.loadNextBatch()) { |
| try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) { |
| System.out.print(root.contentToTSVString()); |
| } |
| } |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| }); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. testoutput:: |
| |
| name |
| David |
| Gladis |
| Juan |
| |
| |
| .. _Arrow Java Dataset: https://arrow.apache.org/docs/dev/java/dataset.html |