blob: 3570422271fe83dcf4b19b355f1928292488f4ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer.bulkloading;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.MutationListener;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.javatuples.Pair;
import org.javatuples.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
* @deprecated As of release 3.2.10, not directly replaced - consider graph provider specific bulk loading methods
*/
@Deprecated
public class BulkLoaderVertexProgram implements VertexProgram<Tuple> {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkLoaderVertexProgram.class);
public static final String BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.bulkLoaderVertexProgram";
public static final String BULK_LOADER_CLASS_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "class");
public static final String BULK_LOADER_VERTEX_ID_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "vertexIdProperty");
public static final String INTERMEDIATE_BATCH_SIZE_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "intermediateBatchSize");
public static final String KEEP_ORIGINAL_IDS_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "keepOriginalIds");
public static final String USER_SUPPLIED_IDS_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "userSuppliedIds");
public static final String WRITE_GRAPH_CFG_KEY = String.join(".", BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX, "writeGraph");
public static final String DEFAULT_BULK_LOADER_VERTEX_ID = "bulkLoader.vertex.id";
private final MessageScope messageScope;
private final Set<VertexComputeKey> elementComputeKeys;
private Configuration configuration;
private BulkLoader bulkLoader;
private Graph graph;
private GraphTraversalSource g;
private long intermediateBatchSize;
private BulkLoadingListener listener;
private BulkLoaderVertexProgram() {
messageScope = MessageScope.Local.of(__::inE);
elementComputeKeys = new HashSet<>();
}
private BulkLoader createBulkLoader() {
final BulkLoader loader;
final Configuration config = configuration.subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX);
if (config.containsKey("class")) {
final String className = config.getString("class");
try {
final Class<?> bulkLoaderClass = Class.forName(className);
loader = (BulkLoader) bulkLoaderClass.getConstructor().newInstance();
} catch (ClassNotFoundException e) {
LOGGER.error("Unable to find custom bulk loader class: {}", className);
throw new IllegalStateException(e);
} catch (Exception e) {
LOGGER.error("Unable to create an instance of the given bulk loader class: {}", className);
throw new IllegalStateException(e);
}
} else {
loader = new IncrementalBulkLoader();
}
loader.configure(configuration);
return loader;
}
/**
* Eventually commits the current transaction and closes the current graph instance. commit() will be called
* if close is set true, otherwise it will only be called if the intermediate batch size is set and reached.
*
* @param close Whether to close the current graph instance after calling commit() or not.
*/
private void commit(final boolean close) {
if (!close && (intermediateBatchSize == 0L || listener.mutations() < intermediateBatchSize))
return;
if (null != graph) {
if (graph.features().graph().supportsTransactions()) {
LOGGER.info("Committing transaction on Graph instance: {} [{} mutations]", graph, listener.mutations());
try {
graph.tx().commit();
LOGGER.debug("Committed transaction on Graph instance: {}", graph);
listener.resetCounter();
} catch (Exception e) {
LOGGER.error("Failed to commit transaction on Graph instance: {}", graph);
graph.tx().rollback();
listener.resetCounter();
throw e;
}
}
if (close) {
try {
graph.close();
LOGGER.info("Closed Graph instance: {}", graph);
graph = null;
} catch (Exception e) {
LOGGER.warn("Failed to close Graph instance", e);
}
}
}
}
@Override
public void setup(final Memory memory) {
}
@Override
public void loadState(final Graph graph, final Configuration config) {
configuration = new BaseConfiguration();
if (config != null) {
ConfigurationUtils.copy(config, configuration);
}
intermediateBatchSize = configuration.getLong(INTERMEDIATE_BATCH_SIZE_CFG_KEY, 0L);
elementComputeKeys.add(VertexComputeKey.of(DEFAULT_BULK_LOADER_VERTEX_ID, true));
bulkLoader = createBulkLoader();
}
@Override
public void storeState(final Configuration config) {
VertexProgram.super.storeState(config);
if (configuration != null) {
ConfigurationUtils.copy(configuration, config);
}
}
@Override
public void workerIterationStart(final Memory memory) {
if (null == graph) {
graph = GraphFactory.open(configuration.subset(WRITE_GRAPH_CFG_KEY));
LOGGER.info("Opened Graph instance: {}", graph);
try {
listener = new BulkLoadingListener();
g = graph.traversal().withStrategies(EventStrategy.build().addListener(listener).create());
} catch (Exception e) {
try {
graph.close();
} catch (Exception e2) {
LOGGER.warn("Failed to close Graph instance", e2);
}
throw e;
}
} else {
LOGGER.warn("Leaked Graph instance: {}", graph);
}
}
@Override
public void workerIterationEnd(final Memory memory) {
this.commit(true);
}
@Override
public void execute(final Vertex sourceVertex, final Messenger<Tuple> messenger, final Memory memory) {
try {
executeInternal(sourceVertex, messenger, memory);
} catch (Exception e) {
if (graph.features().graph().supportsTransactions()) {
graph.tx().rollback();
}
throw e;
}
}
private void executeInternal(final Vertex sourceVertex, final Messenger<Tuple> messenger, final Memory memory) {
if (memory.isInitialIteration()) {
this.listener.resetStats();
// get or create the vertex
final Vertex targetVertex = bulkLoader.getOrCreateVertex(sourceVertex, graph, g);
// write all the properties of the vertex to the newly created vertex
final Iterator<VertexProperty<Object>> vpi = sourceVertex.properties();
if (this.listener.isNewVertex()) {
vpi.forEachRemaining(vp -> bulkLoader.createVertexProperty(vp, targetVertex, graph, g));
} else {
vpi.forEachRemaining(vp -> bulkLoader.getOrCreateVertexProperty(vp, targetVertex, graph, g));
}
this.commit(false);
if (!bulkLoader.useUserSuppliedIds()) {
// create an id pair and send it to all the vertex's incoming adjacent vertices
sourceVertex.property(DEFAULT_BULK_LOADER_VERTEX_ID, targetVertex.id());
messenger.sendMessage(messageScope, Pair.with(sourceVertex.id(), targetVertex.id()));
}
} else if (memory.getIteration() == 1) {
if (bulkLoader.useUserSuppliedIds()) {
final Vertex outV = bulkLoader.getVertex(sourceVertex, graph, g);
final boolean incremental = outV.edges(Direction.OUT).hasNext();
sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
final Vertex inV = bulkLoader.getVertex(edge.inVertex(), graph, g);
if (incremental) {
bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
} else {
bulkLoader.createEdge(edge, outV, inV, graph, g);
}
this.commit(false);
});
} else {
// create an id map and populate it with all the incoming messages
final Map<Object, Object> idPairs = new HashMap<>();
final Iterator<Tuple> idi = messenger.receiveMessages();
while (idi.hasNext()) {
final Tuple idPair = idi.next();
idPairs.put(idPair.getValue(0), idPair.getValue(1));
}
// get the vertex with given the dummy id property
final Object outVId = sourceVertex.value(DEFAULT_BULK_LOADER_VERTEX_ID);
final Vertex outV = bulkLoader.getVertexById(outVId, graph, g);
// for all the incoming edges of the vertex, get the incoming adjacent vertex and write the edge and its properties
sourceVertex.edges(Direction.OUT).forEachRemaining(edge -> {
final Object inVId = idPairs.get(edge.inVertex().id());
final Vertex inV = bulkLoader.getVertexById(inVId, graph, g);
bulkLoader.getOrCreateEdge(edge, outV, inV, graph, g);
this.commit(false);
});
}
} else if (memory.getIteration() == 2) {
final Object vertexId = sourceVertex.value(DEFAULT_BULK_LOADER_VERTEX_ID);
bulkLoader.getVertexById(vertexId, graph, g)
.property(bulkLoader.getVertexIdProperty()).remove();
this.commit(false);
}
}
@Override
public boolean terminate(final Memory memory) {
switch (memory.getIteration()) {
case 1:
return bulkLoader.keepOriginalIds() || bulkLoader.getVertexIdProperty() == null;
case 2:
return true;
}
return false;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return elementComputeKeys;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.singleton(messageScope);
}
@SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "CloneDoesntCallSuperClone"})
@Override
public VertexProgram<Tuple> clone() {
return this;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
if (bulkLoader != null) {
sb.append("bulkLoader=").append(bulkLoader.getClass().getSimpleName()).append(", ");
sb.append("vertexIdProperty=").append(bulkLoader.getVertexIdProperty()).append(", ");
sb.append("userSuppliedIds=").append(bulkLoader.useUserSuppliedIds()).append(", ");
sb.append("keepOriginalIds=").append(bulkLoader.keepOriginalIds()).append(", ");
} else {
sb.append("bulkLoader=").append(bulkLoader).append(", ");
}
sb.append("batchSize=").append(intermediateBatchSize);
return StringFactory.vertexProgramString(this, sb.toString());
}
public static Builder build() {
return new Builder();
}
public static class Builder extends AbstractVertexProgramBuilder<Builder> {
private Builder() {
super(BulkLoaderVertexProgram.class);
}
@SuppressWarnings("unchecked")
@Override
public BulkLoaderVertexProgram create(final Graph graph) {
ConfigurationUtils.append(graph.configuration().subset(BULK_LOADER_VERTEX_PROGRAM_CFG_PREFIX), configuration);
return (BulkLoaderVertexProgram) VertexProgram.createVertexProgram(graph, configuration);
}
private void setGraphConfigurationProperty(final String key, final Object value) {
configuration.setProperty(String.join(".", WRITE_GRAPH_CFG_KEY, key), value);
}
/**
* Sets the class name of the BulkLoader implementation to be used.
*/
public Builder bulkLoader(final String className) {
configuration.setProperty(BULK_LOADER_CLASS_CFG_KEY, className);
return this;
}
/**
* Sets the class of the BulkLoader implementation to be used.
*/
public Builder bulkLoader(final Class<? extends BulkLoader> clazz) {
return bulkLoader(clazz.getCanonicalName());
}
/**
* Sets the name of the property that is used to store the original vertex identifiers in the target graph.
*/
public Builder vertexIdProperty(final String name) {
configuration.setProperty(BULK_LOADER_VERTEX_ID_CFG_KEY, name);
return this;
}
/**
* Specifies whether user supplied identifiers should be used when the bulk loader creates vertices in the
* target graph.
*/
public Builder userSuppliedIds(final boolean useUserSuppliedIds) {
configuration.setProperty(USER_SUPPLIED_IDS_CFG_KEY, useUserSuppliedIds);
return this;
}
/**
* Specifies whether the original vertex identifiers should be kept in the target graph or not. In case of false
* BulkLoaderVertexProgram will add another iteration to remove the properties and it won't be possible to use
* the data for further incremental bulk loads.
*/
public Builder keepOriginalIds(final boolean keepOriginalIds) {
configuration.setProperty(KEEP_ORIGINAL_IDS_CFG_KEY, keepOriginalIds);
return this;
}
/**
* The batch size for a single transaction (number of vertices in the vertex loading stage; number of edges in
* the edge loading stage).
*/
public Builder intermediateBatchSize(final int batchSize) {
configuration.setProperty(INTERMEDIATE_BATCH_SIZE_CFG_KEY, batchSize);
return this;
}
/**
* A configuration for the target graph that can be passed to GraphFactory.open().
*/
public Builder writeGraph(final String configurationFile) throws ConfigurationException {
return writeGraph(new PropertiesConfiguration(configurationFile));
}
/**
* A configuration for the target graph that can be passed to GraphFactory.open().
*/
public Builder writeGraph(final Configuration configuration) {
configuration.getKeys().forEachRemaining(key -> setGraphConfigurationProperty(key, configuration.getProperty(key)));
return this;
}
}
@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresLocalMessageScopes() {
return true;
}
@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}
static class BulkLoadingListener implements MutationListener {
private long counter;
private boolean isNewVertex;
public BulkLoadingListener() {
this.counter = 0L;
this.isNewVertex = false;
}
public boolean isNewVertex() {
return this.isNewVertex;
}
public long mutations() {
return this.counter;
}
public void resetStats() {
this.isNewVertex = false;
}
public void resetCounter() {
this.counter = 0L;
}
@Override
public void vertexAdded(final Vertex vertex) {
this.isNewVertex = true;
this.counter++;
}
@Override
public void vertexRemoved(final Vertex vertex) {
this.counter++;
}
@Override
public void vertexPropertyChanged(final Vertex element, final VertexProperty oldValue, final Object setValue,
final Object... vertexPropertyKeyValues) {
this.counter++;
}
@Override
public void vertexPropertyRemoved(final VertexProperty vertexProperty) {
this.counter++;
}
@Override
public void edgeAdded(final Edge edge) {
this.counter++;
}
@Override
public void edgeRemoved(final Edge edge) {
this.counter++;
}
@Override
public void edgePropertyChanged(final Edge element, final Property oldValue, final Object setValue) {
this.counter++;
}
@Override
public void edgePropertyRemoved(final Edge element, final Property property) {
this.counter++;
}
@Override
public void vertexPropertyPropertyChanged(final VertexProperty element, final Property oldValue, final Object setValue) {
this.counter++;
}
@Override
public void vertexPropertyPropertyRemoved(final VertexProperty element, final Property property) {
this.counter++;
}
}
}