blob: e026856034d0b7dac9612816c68a0cff9f068774 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.repository.graphdb.janus.migration;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeParsers.ParseElement;
import org.apache.atlas.repository.graphdb.janus.migration.JsonNodeProcessManager.WorkItemManager;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo;
import org.apache.tinkerpop.shaded.jackson.core.JsonFactory;
import org.apache.tinkerpop.shaded.jackson.core.JsonParser;
import org.apache.tinkerpop.shaded.jackson.core.JsonToken;
import org.apache.tinkerpop.shaded.jackson.databind.JsonNode;
import org.apache.tinkerpop.shaded.jackson.databind.ObjectMapper;
import org.janusgraph.core.JanusGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicLong;
public final class AtlasGraphSONReader {
private static final Logger LOG = LoggerFactory.getLogger(AtlasGraphSONReader.class);
private static String APPLICATION_PROPERTY_MIGRATION_START_INDEX = "atlas.migration.mode.start.index";
private static String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
private static String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private final ObjectMapper mapper;
private final ElementProcessors relationshipCache;
private final Graph graph;
private final Graph bulkLoadGraph;
private final int numWorkers;
private final int batchSize;
private final long suppliedStartIndex;
private final GraphSONUtility graphSONUtility;
private ReaderStatusManager readerStatusManager;
private AtomicLong counter;
private AtlasGraphSONReader(ObjectMapper mapper, ElementProcessors relationshipLookup, Graph graph,
Graph bulkLoadGraph, int numWorkers, int batchSize, long suppliedStartIndex) {
this.mapper = mapper;
this.relationshipCache = relationshipLookup;
this.graph = graph;
this.bulkLoadGraph = bulkLoadGraph;
this.numWorkers = numWorkers;
this.batchSize = batchSize;
this.suppliedStartIndex = suppliedStartIndex;
this.graphSONUtility = new GraphSONUtility(relationshipCache);
}
public void readGraph(final InputStream inputStream) throws IOException {
counter = new AtomicLong(0);
final long startIndex = initStatusManager();
final JsonFactory factory = mapper.getFactory();
LOG.info("AtlasGraphSONReader.readGraph: numWorkers: {}: batchSize: {}: startIndex: {}", numWorkers, batchSize, startIndex);
try (JsonParser parser = factory.createParser(inputStream)) {
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected data to start with an Object");
}
readerStatusManager.update(bulkLoadGraph, counter.get(), ReaderStatusManager.STATUS_IN_PROGRESS);
while (parser.nextToken() != JsonToken.END_OBJECT) {
final String fieldName = parser.getCurrentName() == null ? "" : parser.getCurrentName();
switch (fieldName) {
case GraphSONTokensTP2.MODE:
parser.nextToken();
final String mode = parser.getText();
if (!mode.equals("EXTENDED")) {
throw new IllegalStateException("The legacy GraphSON must be generated with GraphSONMode.EXTENDED");
}
counter.getAndIncrement();
break;
case GraphSONTokensTP2.VERTICES:
processElement(parser, new JsonNodeParsers.ParseVertex(), startIndex);
break;
case GraphSONTokensTP2.EDGES:
processElement(parser, new JsonNodeParsers.ParseEdge(), startIndex);
break;
case GraphSONTokensTP2.VERTEX_COUNT:
parser.nextToken();
LOG.info("Vertex count: {}", parser.getLongValue());
break;
case GraphSONTokensTP2.EDGE_COUNT:
parser.nextToken();
LOG.info("Edge count: {}", parser.getLongValue());
break;
default:
throw new IllegalStateException(String.format("Unexpected token in GraphSON - %s", fieldName));
}
}
postProcess(startIndex);
readerStatusManager.end(bulkLoadGraph, counter.get(), ReaderStatusManager.STATUS_SUCCESS);
} catch (Exception ex) {
readerStatusManager.end(bulkLoadGraph, counter.get(), ReaderStatusManager.STATUS_FAILED);
throw new IOException(ex);
} finally {
LOG.info("AtlasGraphSONReader.readGraph: Done!: {}", counter.get());
}
}
private long initStatusManager() {
readerStatusManager = new ReaderStatusManager(graph, bulkLoadGraph);
return (this.suppliedStartIndex == 0) ? readerStatusManager.getStartIndex() : this.suppliedStartIndex;
}
private void processElement(JsonParser parser, ParseElement parseElement, long startIndex) throws InterruptedException {
LOG.info("processElement: {}: Starting... : counter at: {}", parseElement.getMessage(), counter.get());
try {
readerStatusManager.update(graph, counter.get(), true);
parseElement.setContext(graphSONUtility);
WorkItemManager wim = JsonNodeProcessManager.create(graph, bulkLoadGraph, parseElement,
numWorkers, batchSize, shouldSkip(startIndex, counter.get()));
parser.nextToken();
while (parser.nextToken() != JsonToken.END_ARRAY) {
handleInterrupt(bulkLoadGraph, counter.incrementAndGet());
final JsonNode node = parser.readValueAsTree();
if (shouldSkip(startIndex, counter.get()) || parseElement.isTypeNode(node)) {
continue;
}
updateStatusConditionally(bulkLoadGraph, counter.get());
wim.produce(node);
}
wim.shutdown();
} catch (InterruptedException ex) {
throw ex;
} catch (Exception ex) {
LOG.error("processElement: {}: failed!", parseElement.getMessage(), ex);
} finally {
LOG.info("processElement: {}: Done! : [{}]", parseElement.getMessage(), counter);
readerStatusManager.update(bulkLoadGraph, counter.get(), true);
}
}
private void postProcess(long startIndex) {
LOG.info("postProcess: Starting... : counter at: {}", counter.get());
try {
PostProcessManager.WorkItemsManager wim = PostProcessManager.create(bulkLoadGraph, relationshipCache.getPropertiesToPostProcess(), batchSize, numWorkers);
GraphTraversal query = bulkLoadGraph.traversal().V();
while (query.hasNext()) {
handleInterrupt(bulkLoadGraph, counter.incrementAndGet());
if(shouldSkip(startIndex, counter.get())) {
continue;
}
Vertex v = (Vertex) query.next();
wim.produce(v.id());
updateStatusConditionally(bulkLoadGraph, counter.get());
}
wim.shutdown();
} catch (Exception ex) {
LOG.error("postProcess: failed!", ex);
} finally {
LOG.info("postProcess: Done! : [{}]", counter.get());
readerStatusManager.update(bulkLoadGraph, counter.get(), true);
}
}
private boolean shouldSkip(long startIndex, long index) {
return (startIndex != 0) && (index <= startIndex);
}
private void handleInterrupt(Graph graph, long counter) throws InterruptedException {
if (!Thread.interrupted()) {
return;
}
readerStatusManager.update(graph, counter, false);
LOG.error("Thread interrupted: {}", counter);
throw new InterruptedException();
}
private void updateStatusConditionally(Graph graph, long counter) {
if(counter % batchSize == 0) {
readerStatusManager.update(graph, counter, false);
}
}
public static Builder build() {
return new Builder();
}
public final static class Builder {
private int batchSize = 500;
private ElementProcessors relationshipCache;
private Graph graph;
private Graph bulkLoadGraph;
private int numWorkers;
private long suppliedStartIndex;
private Builder() {
}
private void setDefaults() {
try {
this.startIndex(ApplicationProperties.get().getLong(APPLICATION_PROPERTY_MIGRATION_START_INDEX, 0L))
.numWorkers(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, 4))
.batchSize(ApplicationProperties.get().getInt(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, 3000));
} catch (AtlasException ex) {
LOG.error("setDefaults: failed!", ex);
}
}
public AtlasGraphSONReader create() {
setDefaults();
if(bulkLoadGraph == null) {
bulkLoadGraph = graph;
}
final GraphSONMapper.Builder builder = GraphSONMapper.build();
final GraphSONMapper mapper = builder.create();
return new AtlasGraphSONReader(mapper.createMapper(), relationshipCache, graph, bulkLoadGraph,
numWorkers, batchSize, suppliedStartIndex);
}
public Builder relationshipCache(ElementProcessors relationshipCache) {
this.relationshipCache = relationshipCache;
return this;
}
public Builder schemaDB(JanusGraph graph) {
this.graph = graph;
return this;
}
public Builder bulkLoadingDB(Graph graph) {
this.bulkLoadGraph = graph;
return this;
}
public Builder numWorkers(int numWorkers) {
if(bulkLoadGraph == null || graph == null) {
this.numWorkers = 1;
LOG.info("numWorkers: {}, since one of the 2 graphs is null.", this.numWorkers);
} else {
this.numWorkers = numWorkers;
}
return this;
}
public Builder batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}
public Builder startIndex(long suppliedStartIndex) {
this.suppliedStartIndex = suppliedStartIndex;
return this;
}
}
}