| .. Licensed to the Apache Software Foundation (ASF) under one |
| .. or more contributor license agreements. See the NOTICE file |
| .. distributed with this work for additional information |
| .. regarding copyright ownership. The ASF licenses this file |
| .. to you under the Apache License, Version 2.0 (the |
| .. "License"); you may not use this file except in compliance |
| .. with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| .. software distributed under the License is distributed on an |
| .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| .. KIND, either express or implied. See the License for the |
| .. specific language governing permissions and limitations |
| .. under the License. |
| |
| ============ |
| Arrow Flight |
| ============ |
| |
| This section contains a number of recipes for working with Arrow Flight. |
| For more detail about Flight please take a look at `Arrow Flight RPC`_. |
| |
| .. contents:: |
| |
| Simple Key-Value Storage Service with Arrow Flight |
| ================================================== |
| |
| We'll implement a service that provides a key-value store for data, using Flight to handle uploads/requests |
| and data in memory to store the actual data. |
| |
| Flight Client and Server |
| ************************ |
| |
| .. testcode:: |
| |
| import org.apache.arrow.flight.Action; |
| import org.apache.arrow.flight.AsyncPutListener; |
| import org.apache.arrow.flight.CallStatus; |
| import org.apache.arrow.flight.Criteria; |
| import org.apache.arrow.flight.FlightClient; |
| import org.apache.arrow.flight.FlightDescriptor; |
| import org.apache.arrow.flight.FlightEndpoint; |
| import org.apache.arrow.flight.FlightInfo; |
| import org.apache.arrow.flight.FlightServer; |
| import org.apache.arrow.flight.FlightStream; |
| import org.apache.arrow.flight.Location; |
| import org.apache.arrow.flight.NoOpFlightProducer; |
| import org.apache.arrow.flight.PutResult; |
| import org.apache.arrow.flight.Result; |
| import org.apache.arrow.flight.Ticket; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.util.AutoCloseables; |
| import org.apache.arrow.vector.VarCharVector; |
| import org.apache.arrow.vector.VectorLoader; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.VectorUnloader; |
| import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; |
| import org.apache.arrow.vector.types.pojo.ArrowType; |
| import org.apache.arrow.vector.types.pojo.Field; |
| import org.apache.arrow.vector.types.pojo.FieldType; |
| import org.apache.arrow.vector.types.pojo.Schema; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| class Dataset implements AutoCloseable { |
| private final List<ArrowRecordBatch> batches; |
| private final Schema schema; |
| private final long rows; |
| public Dataset(List<ArrowRecordBatch> batches, Schema schema, long rows) { |
| this.batches = batches; |
| this.schema = schema; |
| this.rows = rows; |
| } |
| public List<ArrowRecordBatch> getBatches() { |
| return batches; |
| } |
| public Schema getSchema() { |
| return schema; |
| } |
| public long getRows() { |
| return rows; |
| } |
| @Override |
| public void close() throws Exception { |
| AutoCloseables.close(batches); |
| } |
| } |
| class CookbookProducer extends NoOpFlightProducer implements AutoCloseable { |
| private final BufferAllocator allocator; |
| private final Location location; |
| private final ConcurrentMap<FlightDescriptor, Dataset> datasets; |
| public CookbookProducer(BufferAllocator allocator, Location location) { |
| this.allocator = allocator; |
| this.location = location; |
| this.datasets = new ConcurrentHashMap<>(); |
| } |
| @Override |
| public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) { |
| List<ArrowRecordBatch> batches = new ArrayList<>(); |
| return () -> { |
| long rows = 0; |
| VectorUnloader unloader; |
| while (flightStream.next()) { |
| unloader = new VectorUnloader(flightStream.getRoot()); |
| final ArrowRecordBatch arb = unloader.getRecordBatch(); |
| batches.add(arb); |
| rows += flightStream.getRoot().getRowCount(); |
| } |
| Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows); |
| datasets.put(flightStream.getDescriptor(), dataset); |
| ackStream.onCompleted(); |
| }; |
| } |
| |
| @Override |
| public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { |
| FlightDescriptor flightDescriptor = FlightDescriptor.path( |
| new String(ticket.getBytes(), StandardCharsets.UTF_8)); |
| Dataset dataset = this.datasets.get(flightDescriptor); |
| if (dataset == null) { |
| throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException(); |
| } |
| try (VectorSchemaRoot root = VectorSchemaRoot.create( |
| this.datasets.get(flightDescriptor).getSchema(), allocator)) { |
| VectorLoader loader = new VectorLoader(root); |
| listener.start(root); |
| for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) { |
| loader.load(arrowRecordBatch); |
| listener.putNext(); |
| } |
| listener.completed(); |
| } |
| } |
| |
| @Override |
| public void doAction(CallContext context, Action action, StreamListener<Result> listener) { |
| FlightDescriptor flightDescriptor = FlightDescriptor.path( |
| new String(action.getBody(), StandardCharsets.UTF_8)); |
| switch (action.getType()) { |
| case "DELETE": { |
| Dataset removed = datasets.remove(flightDescriptor); |
| if (removed != null) { |
| try { |
| removed.close(); |
| } catch (Exception e) { |
| listener.onError(CallStatus.INTERNAL |
| .withDescription(e.toString()) |
| .toRuntimeException()); |
| return; |
| } |
| Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); |
| listener.onNext(result); |
| } else { |
| Result result = new Result("Delete not completed. Reason: Key did not exist." |
| .getBytes(StandardCharsets.UTF_8)); |
| listener.onNext(result); |
| } |
| listener.onCompleted(); |
| } |
| } |
| } |
| |
| @Override |
| public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { |
| FlightEndpoint flightEndpoint = new FlightEndpoint( |
| new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location); |
| return new FlightInfo( |
| datasets.get(descriptor).getSchema(), |
| descriptor, |
| Collections.singletonList(flightEndpoint), |
| /*bytes=*/-1, |
| datasets.get(descriptor).getRows() |
| ); |
| } |
| |
| @Override |
| public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) { |
| datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); }); |
| listener.onCompleted(); |
| } |
| |
| @Override |
| public void close() throws Exception { |
| AutoCloseables.close(datasets.values()); |
| } |
| } |
| Location location = Location.forGrpcInsecure("0.0.0.0", 33333); |
| try (BufferAllocator allocator = new RootAllocator()){ |
| // Server |
| try(final CookbookProducer producer = new CookbookProducer(allocator, location); |
| final FlightServer flightServer = FlightServer.builder(allocator, location, producer).build()) { |
| try { |
| flightServer.start(); |
| System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| |
| // Client |
| try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { |
| System.out.println("C1: Client (Location): Connected to " + location.getUri()); |
| |
| // Populate data |
| Schema schema = new Schema(Arrays.asList( |
| new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); |
| try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); |
| VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { |
| varCharVector.allocateNew(3); |
| varCharVector.set(0, "Ronald".getBytes()); |
| varCharVector.set(1, "David".getBytes()); |
| varCharVector.set(2, "Francisco".getBytes()); |
| vectorSchemaRoot.setRowCount(3); |
| FlightClient.ClientStreamListener listener = flightClient.startPut( |
| FlightDescriptor.path("profiles"), |
| vectorSchemaRoot, new AsyncPutListener()); |
| listener.putNext(); |
| varCharVector.set(0, "Manuel".getBytes()); |
| varCharVector.set(1, "Felipe".getBytes()); |
| varCharVector.set(2, "JJ".getBytes()); |
| vectorSchemaRoot.setRowCount(3); |
| listener.putNext(); |
| listener.completed(); |
| listener.getResult(); |
| System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); |
| } |
| |
| // Get metadata information |
| FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); |
| System.out.println("C3: Client (Get Metadata): " + flightInfo); |
| |
| // Get data information |
| try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) { |
| int batch = 0; |
| try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { |
| System.out.println("C4: Client (Get Stream):"); |
| while (flightStream.next()) { |
| batch++; |
| System.out.println("Client Received batch #" + batch + ", Data:"); |
| System.out.print(vectorSchemaRootReceived.contentToTSVString()); |
| } |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| // Get all metadata information |
| Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL); |
| System.out.print("C5: Client (List Flights Info): "); |
| flightInfosBefore.forEach(t -> System.out.println(t)); |
| |
| // Do delete action |
| Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", |
| FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); |
| while (deleteActionResult.hasNext()) { |
| Result result = deleteActionResult.next(); |
| System.out.println("C6: Client (Do Delete Action): " + |
| new String(result.getBody(), StandardCharsets.UTF_8)); |
| } |
| |
| // Get all metadata information (to validate detele action) |
| Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL); |
| flightInfos.forEach(t -> System.out.println(t)); |
| System.out.println("C7: Client (List Flights Info): After delete - No records"); |
| |
| // Server shut down |
| flightServer.shutdown(); |
| System.out.println("C8: Server shut down successfully"); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| .. testoutput:: |
| |
| S1: Server (Location): Listening on port 33333 |
| C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 |
| C2: Client (Populate Data): Wrote 2 batches with 3 rows each |
| C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} |
| C4: Client (Get Stream): |
| Client Received batch #1, Data: |
| name |
| Ronald |
| David |
| Francisco |
| Client Received batch #2, Data: |
| name |
| Manuel |
| Felipe |
| JJ |
| C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6, ordered=false} |
| C6: Client (Do Delete Action): Delete completed |
| C7: Client (List Flights Info): After delete - No records |
| C8: Server shut down successfully |
| |
| Let explain our code in more detail. |
| |
| Start Flight Server |
| ******************* |
| |
| First, we'll start our server: |
| |
| .. code-block:: java |
| |
| try(FlightServer flightServer = FlightServer.builder(allocator, location, |
| new CookbookProducer(allocator, location)).build()) { |
| try { |
| flightServer.start(); |
| System.out.println("S1: Server (Location): Listening on port " + flightServer.getPort()); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| |
| .. code-block:: shell |
| |
| S1: Server (Location): Listening on port 33333 |
| |
| Connect to Flight Server |
| ************************ |
| |
| We can then create a client and connect to the server: |
| |
| .. code-block:: java |
| |
| try (FlightClient flightClient = FlightClient.builder(allocator, location).build()) { |
| System.out.println("C1: Client (Location): Connected to " + location.getUri()); |
| |
| .. code-block:: shell |
| |
| C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333 |
| |
| Put Data |
| ******** |
| |
| First, we'll create and upload a vector schema root, which will get stored in a |
| memory by the server. |
| |
| .. code-block:: java |
| |
| // Server |
| public Runnable acceptPut(CallContext context, FlightStream flightStream, StreamListener<PutResult> ackStream) { |
| List<ArrowRecordBatch> batches = new ArrayList<>(); |
| return () -> { |
| long rows = 0; |
| VectorUnloader unloader; |
| while (flightStream.next()) { |
| unloader = new VectorUnloader(flightStream.getRoot()); |
| try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { |
| batches.add(arb); |
| rows += flightStream.getRoot().getRowCount(); |
| } |
| } |
| Dataset dataset = new Dataset(batches, flightStream.getSchema(), rows); |
| datasets.put(flightStream.getDescriptor(), dataset); |
| ackStream.onCompleted(); |
| }; |
| } |
| |
| // Client |
| Schema schema = new Schema(Arrays.asList( |
| new Field("name", FieldType.nullable(new ArrowType.Utf8()), null))); |
| try(VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator); |
| VarCharVector varCharVector = (VarCharVector) vectorSchemaRoot.getVector("name")) { |
| varCharVector.allocateNew(3); |
| varCharVector.set(0, "Ronald".getBytes()); |
| varCharVector.set(1, "David".getBytes()); |
| varCharVector.set(2, "Francisco".getBytes()); |
| vectorSchemaRoot.setRowCount(3); |
| FlightClient.ClientStreamListener listener = flightClient.startPut( |
| FlightDescriptor.path("profiles"), |
| vectorSchemaRoot, new AsyncPutListener()); |
| listener.putNext(); |
| varCharVector.set(0, "Manuel".getBytes()); |
| varCharVector.set(1, "Felipe".getBytes()); |
| varCharVector.set(2, "JJ".getBytes()); |
| vectorSchemaRoot.setRowCount(3); |
| listener.putNext(); |
| listener.completed(); |
| listener.getResult(); |
| System.out.println("C2: Client (Populate Data): Wrote 2 batches with 3 rows each"); |
| } |
| |
| .. code-block:: shell |
| |
| C2: Client (Populate Data): Wrote 2 batches with 3 rows each |
| |
| Get Metadata |
| ************ |
| |
| Once we do so, we can retrieve the metadata for that dataset. |
| |
| .. code-block:: java |
| |
| // Server |
| public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { |
| FlightEndpoint flightEndpoint = new FlightEndpoint( |
| new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location); |
| return new FlightInfo( |
| datasets.get(descriptor).getSchema(), |
| descriptor, |
| Collections.singletonList(flightEndpoint), |
| /*bytes=*/-1, |
| datasets.get(descriptor).getRows() |
| ); |
| } |
| |
| // Client |
| FlightInfo flightInfo = flightClient.getInfo(FlightDescriptor.path("profiles")); |
| System.out.println("C3: Client (Get Metadata): " + flightInfo); |
| |
| .. code-block:: shell |
| |
| C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a, expirationTime=(none)}], bytes=-1, records=6} |
| |
| Get Data |
| ******** |
| |
| And get the data back: |
| |
| .. code-block:: java |
| |
| // Server |
| public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { |
| FlightDescriptor flightDescriptor = FlightDescriptor.path( |
| new String(ticket.getBytes(), StandardCharsets.UTF_8)); |
| Dataset dataset = this.datasets.get(flightDescriptor); |
| if (dataset == null) { |
| throw CallStatus.NOT_FOUND.withDescription("Unknown descriptor").toRuntimeException(); |
| } else { |
| VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create( |
| this.datasets.get(flightDescriptor).getSchema(), allocator); |
| listener.start(vectorSchemaRoot); |
| for (ArrowRecordBatch arrowRecordBatch : this.datasets.get(flightDescriptor).getBatches()) { |
| VectorLoader loader = new VectorLoader(vectorSchemaRoot); |
| loader.load(arrowRecordBatch.cloneWithTransfer(allocator)); |
| listener.putNext(); |
| } |
| listener.completed(); |
| } |
| } |
| |
| // Client |
| try(FlightStream flightStream = flightClient.getStream(flightInfo.getEndpoints().get(0).getTicket())) { |
| int batch = 0; |
| try (VectorSchemaRoot vectorSchemaRootReceived = flightStream.getRoot()) { |
| System.out.println("C4: Client (Get Stream):"); |
| while (flightStream.next()) { |
| batch++; |
| System.out.println("Client Received batch #" + batch + ", Data:"); |
| System.out.print(vectorSchemaRootReceived.contentToTSVString()); |
| } |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| |
| .. code-block:: shell |
| |
| C4: Client (Get Stream): |
| Client Received batch #1, Data: |
| name |
| Ronald |
| David |
| Francisco |
| Client Received batch #2, Data: |
| name |
| Manuel |
| Felipe |
| JJ |
| |
| Delete data |
| *********** |
| |
| Then, we'll delete the dataset: |
| |
| .. code-block:: java |
| |
| // Server |
| public void doAction(CallContext context, Action action, StreamListener<Result> listener) { |
| FlightDescriptor flightDescriptor = FlightDescriptor.path( |
| new String(action.getBody(), StandardCharsets.UTF_8)); |
| switch (action.getType()) { |
| case "DELETE": |
| if (datasets.remove(flightDescriptor) != null) { |
| Result result = new Result("Delete completed".getBytes(StandardCharsets.UTF_8)); |
| listener.onNext(result); |
| } else { |
| Result result = new Result("Delete not completed. Reason: Key did not exist." |
| .getBytes(StandardCharsets.UTF_8)); |
| listener.onNext(result); |
| } |
| listener.onCompleted(); |
| } |
| } |
| |
| // Client |
| Iterator<Result> deleteActionResult = flightClient.doAction(new Action("DELETE", |
| FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8))); |
| while (deleteActionResult.hasNext()) { |
| Result result = deleteActionResult.next(); |
| System.out.println("C6: Client (Do Delete Action): " + |
| new String(result.getBody(), StandardCharsets.UTF_8)); |
| } |
| |
| .. code-block:: shell |
| |
| C6: Client (Do Delete Action): Delete completed |
| |
| Validate Delete Data |
| ******************** |
| |
| And confirm that it's been deleted: |
| |
| .. code-block:: java |
| |
| // Server |
| public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) { |
| datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k)); }); |
| listener.onCompleted(); |
| } |
| |
| // Client |
| Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL); |
| flightInfos.forEach(t -> System.out.println(t)); |
| System.out.println("C7: Client (List Flights Info): After delete - No records"); |
| |
| .. code-block:: shell |
| |
| C7: Client (List Flights Info): After delete - No records |
| |
| Stop Flight Server |
| ****************** |
| |
| .. code-block:: java |
| |
| // Server |
| flightServer.shutdown(); |
| System.out.println("C8: Server shut down successfully"); |
| |
| .. code-block:: shell |
| |
| C8: Server shut down successfully |
| |
| _`Arrow Flight RPC`: https://arrow.apache.org/docs/format/Flight.html |