| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.tinkerpop.gremlin.structure.io.gryo; |
| |
| import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; |
| import org.apache.tinkerpop.gremlin.structure.Direction; |
| import org.apache.tinkerpop.gremlin.structure.Edge; |
| import org.apache.tinkerpop.gremlin.structure.Graph; |
| import org.apache.tinkerpop.gremlin.structure.Property; |
| import org.apache.tinkerpop.gremlin.structure.T; |
| import org.apache.tinkerpop.gremlin.structure.Vertex; |
| import org.apache.tinkerpop.gremlin.structure.VertexProperty; |
| import org.apache.tinkerpop.gremlin.structure.io.GraphReader; |
| import org.apache.tinkerpop.gremlin.structure.io.GraphWriter; |
| import org.apache.tinkerpop.gremlin.structure.io.Mapper; |
| import org.apache.tinkerpop.gremlin.structure.util.Attachable; |
| import org.apache.tinkerpop.gremlin.structure.util.Host; |
| import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge; |
| import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty; |
| import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty; |
| import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; |
| import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGryoSerializer; |
| import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; |
| import org.apache.tinkerpop.shaded.kryo.Kryo; |
| import org.apache.tinkerpop.shaded.kryo.io.Input; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Function; |
| |
| /** |
| * The {@link GraphReader} for the Gremlin Structure serialization format based on Kryo. The format is meant to be |
| * non-lossy in terms of Gremlin Structure to Gremlin Structure migrations (assuming both structure implementations |
| * support the same graph features). |
| * <p/> |
| * This implementation is not thread-safe. Have one {@code GryoReader} instance per thread. |
| * |
| * @author Stephen Mallette (http://stephen.genoprime.com) |
| * @author Marko A. Rodriguez (http://markorodriguez.com) |
| */ |
| public final class GryoReader implements GraphReader { |
| private final Kryo kryo; |
| private final Map<GraphFilter, StarGraphGryoSerializer> graphFilterCache = new HashMap<>(); |
| |
| private final long batchSize; |
| |
| private GryoReader(final long batchSize, final Mapper<Kryo> gryoMapper) { |
| this.kryo = gryoMapper.createMapper(); |
| this.batchSize = batchSize; |
| } |
| |
| /** |
| * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or |
| * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}. |
| * |
| * @param inputStream a stream containing an entire graph of vertices and edges as defined by the accompanying |
| * {@link GraphWriter#writeGraph(OutputStream, Graph)}. |
| * @param graphToWriteTo the graph to write to when reading from the stream. |
| * @throws IOException |
| */ |
| @Override |
| public void readGraph(final InputStream inputStream, final Graph graphToWriteTo) throws IOException { |
| // dual pass - create all vertices and store to cache the ids. then create edges. as long as we don't |
| // have vertex labels in the output we can't do this single pass |
| final Map<StarGraph.StarVertex, Vertex> cache = new HashMap<>(); |
| final AtomicLong counter = new AtomicLong(0); |
| |
| final Graph.Features.EdgeFeatures edgeFeatures = graphToWriteTo.features().edge(); |
| final boolean supportsTx = graphToWriteTo.features().graph().supportsTransactions(); |
| |
| IteratorUtils.iterate(new VertexInputIterator(new Input(inputStream), attachable -> { |
| final Vertex v = cache.put((StarGraph.StarVertex) attachable.get(), attachable.attach(Attachable.Method.create(graphToWriteTo))); |
| if (supportsTx && counter.incrementAndGet() % batchSize == 0) |
| graphToWriteTo.tx().commit(); |
| return v; |
| }, null, null)); |
| cache.entrySet().forEach(kv -> kv.getKey().edges(Direction.IN).forEachRemaining(e -> { |
| // can't use a standard Attachable attach method here because we have to use the cache for those |
| // graphs that don't support userSuppliedIds on edges. note that outVertex/inVertex methods return |
| // StarAdjacentVertex whose equality should match StarVertex. |
| final Vertex cachedOutV = cache.get(e.outVertex()); |
| final Vertex cachedInV = cache.get(e.inVertex()); |
| final Edge newEdge = edgeFeatures.willAllowId(e.id()) ? cachedOutV.addEdge(e.label(), cachedInV, T.id, e.id()) : cachedOutV.addEdge(e.label(), cachedInV); |
| e.properties().forEachRemaining(p -> newEdge.property(p.key(), p.value())); |
| if (supportsTx && counter.incrementAndGet() % batchSize == 0) |
| graphToWriteTo.tx().commit(); |
| })); |
| |
| if (supportsTx) graphToWriteTo.tx().commit(); |
| } |
| |
| @Override |
| public Optional<Vertex> readVertex(final InputStream inputStream, final GraphFilter graphFilter) throws IOException { |
| StarGraphGryoSerializer serializer = this.graphFilterCache.get(graphFilter); |
| if (null == serializer) { |
| serializer = StarGraphGryoSerializer.withGraphFilter(graphFilter); |
| this.graphFilterCache.put(graphFilter, serializer); |
| } |
| final Input input = new Input(inputStream); |
| this.readHeader(input); |
| final StarGraph starGraph = this.kryo.readObject(input, StarGraph.class, serializer); |
| // read the terminator |
| this.kryo.readClassAndObject(input); |
| return Optional.ofNullable(starGraph == null ? null : starGraph.getStarVertex()); |
| } |
| |
| /** |
| * Read {@link Vertex} objects from output generated by any of the {@link GryoWriter} {@code writeVertex} or |
| * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}. |
| * |
| * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying |
| * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or |
| * {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods. |
| * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object. |
| * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object. |
| * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}. |
| */ |
| @Override |
| public Iterator<Vertex> readVertices(final InputStream inputStream, |
| final Function<Attachable<Vertex>, Vertex> vertexAttachMethod, |
| final Function<Attachable<Edge>, Edge> edgeAttachMethod, |
| final Direction attachEdgesOfThisDirection) throws IOException { |
| return new VertexInputIterator(new Input(inputStream), vertexAttachMethod, attachEdgesOfThisDirection, edgeAttachMethod); |
| } |
| |
| /** |
| * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or |
| * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}. |
| * |
| * @param inputStream a stream containing at least a single vertex as defined by the accompanying |
| * {@link GraphWriter#writeVertex(OutputStream, Vertex)}. |
| * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object. |
| */ |
| @Override |
| public Vertex readVertex(final InputStream inputStream, final Function<Attachable<Vertex>, Vertex> vertexAttachMethod) throws IOException { |
| return readVertex(inputStream, vertexAttachMethod, null, null); |
| } |
| |
| /** |
| * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or |
| * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}. |
| * |
| * @param inputStream a stream containing at least one {@link Vertex} as defined by the accompanying |
| * {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method. |
| * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object. |
| * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object. |
| * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}. |
| */ |
| @Override |
| public Vertex readVertex(final InputStream inputStream, |
| final Function<Attachable<Vertex>, Vertex> vertexAttachMethod, |
| final Function<Attachable<Edge>, Edge> edgeAttachMethod, |
| final Direction attachEdgesOfThisDirection) throws IOException { |
| final Input input = new Input(inputStream); |
| return readVertexInternal(vertexAttachMethod, edgeAttachMethod, attachEdgesOfThisDirection, input); |
| } |
| |
| /** |
| * Read an {@link Edge} from output generated by {@link GryoWriter#writeEdge(OutputStream, Edge)} or via |
| * an {@link Edge} passed to {@link GryoWriter#writeObject(OutputStream, Object)}. |
| * |
| * @param inputStream a stream containing at least one {@link Edge} as defined by the accompanying |
| * {@link GraphWriter#writeEdge(OutputStream, Edge)} method. |
| * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object. |
| */ |
| @Override |
| public Edge readEdge(final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod) throws IOException { |
| final Input input = new Input(inputStream); |
| readHeader(input); |
| final Attachable<Edge> attachable = kryo.readObject(input, DetachedEdge.class); |
| return edgeAttachMethod.apply(attachable); |
| } |
| |
| /** |
| * Read a {@link VertexProperty} from output generated by |
| * {@link GryoWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed |
| * to {@link GryoWriter#writeObject(OutputStream, Object)}. |
| * |
| * @param inputStream a stream containing at least one {@link VertexProperty} as written by the accompanying |
| * {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method. |
| * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a |
| * {@link Host} object. |
| */ |
| @Override |
| public VertexProperty readVertexProperty(final InputStream inputStream, |
| final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod) throws IOException { |
| final Input input = new Input(inputStream); |
| readHeader(input); |
| final Attachable<VertexProperty> attachable = kryo.readObject(input, DetachedVertexProperty.class); |
| return vertexPropertyAttachMethod.apply(attachable); |
| } |
| |
| /** |
| * Read a {@link Property} from output generated by {@link GryoWriter#writeProperty(OutputStream, Property)} or |
| * via an {@link Property} passed to {@link GryoWriter#writeObject(OutputStream, Object)}. |
| * |
| * @param inputStream a stream containing at least one {@link Property} as written by the accompanying |
| * {@link GraphWriter#writeProperty(OutputStream, Property)} method. |
| * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object. |
| */ |
| @Override |
| public Property readProperty(final InputStream inputStream, |
| final Function<Attachable<Property>, Property> propertyAttachMethod) throws IOException { |
| final Input input = new Input(inputStream); |
| readHeader(input); |
| final Attachable<Property> attachable = kryo.readObject(input, DetachedProperty.class); |
| return propertyAttachMethod.apply(attachable); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz) throws IOException { |
| return clazz.cast(this.kryo.readClassAndObject(new Input(inputStream))); |
| } |
| |
| private Vertex readVertexInternal(final Function<Attachable<Vertex>, Vertex> vertexMaker, |
| final Function<Attachable<Edge>, Edge> edgeMaker, |
| final Direction d, |
| final Input input) throws IOException { |
| readHeader(input); |
| final StarGraph starGraph = kryo.readObject(input, StarGraph.class); |
| |
| // read the terminator |
| kryo.readClassAndObject(input); |
| |
| final Vertex v = vertexMaker.apply(starGraph.getStarVertex()); |
| if (edgeMaker != null) |
| starGraph.getStarVertex().edges(d).forEachRemaining(e -> edgeMaker.apply((Attachable<Edge>) e)); |
| return v; |
| } |
| |
| private void readHeader(final Input input) throws IOException { |
| if (!Arrays.equals(GryoMapper.GIO, input.readBytes(3))) |
| throw new IOException("Invalid format - first three bytes of header do not match expected value"); |
| |
| // skip the next 13 bytes - for future use |
| input.readBytes(13); |
| } |
| |
| public static Builder build() { |
| return new Builder(); |
| } |
| |
| public final static class Builder implements ReaderBuilder<GryoReader> { |
| |
| private long batchSize = 10000; |
| /** |
| * Always use the most recent gryo version by default |
| */ |
| private Mapper<Kryo> gryoMapper = GryoMapper.build().create(); |
| |
| private Builder() { |
| } |
| |
| /** |
| * Number of mutations to perform before a commit is executed when using |
| * {@link GryoReader#readGraph(InputStream, Graph)}. |
| */ |
| public Builder batchSize(final long batchSize) { |
| this.batchSize = batchSize; |
| return this; |
| } |
| |
| /** |
| * Supply a mapper {@link GryoMapper} instance to use as the serializer for the {@code KryoWriter}. |
| */ |
| public Builder mapper(final Mapper<Kryo> gryoMapper) { |
| this.gryoMapper = gryoMapper; |
| return this; |
| } |
| |
| public GryoReader create() { |
| return new GryoReader(batchSize, this.gryoMapper); |
| } |
| |
| } |
| |
| private class VertexInputIterator implements Iterator<Vertex> { |
| private final Input input; |
| private final Function<Attachable<Vertex>, Vertex> vertexMaker; |
| private final Direction d; |
| private final Function<Attachable<Edge>, Edge> edgeMaker; |
| |
| public VertexInputIterator(final Input input, |
| final Function<Attachable<Vertex>, Vertex> vertexMaker, |
| final Direction d, |
| final Function<Attachable<Edge>, Edge> edgeMaker) { |
| this.input = input; |
| this.d = d; |
| this.edgeMaker = edgeMaker; |
| this.vertexMaker = vertexMaker; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return !input.eof(); |
| } |
| |
| @Override |
| public Vertex next() { |
| try { |
| return readVertexInternal(vertexMaker, edgeMaker, d, input); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| } |
| } |