blob: c05d8f89f448076e1c024ac5de1b431689319b49 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.traversal.algorithm;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.EdgesQueryIterator;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.iterator.FilterIterator;
import org.apache.hugegraph.iterator.MapperIterator;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.traversal.algorithm.steps.Steps;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.util.Consumers;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import com.google.common.base.Objects;
public abstract class OltpTraverser extends HugeTraverser
implements AutoCloseable {
private static final String EXECUTOR_NAME = "oltp";
private static Consumers.ExecutorPool executors;
protected OltpTraverser(HugeGraph graph) {
super(graph);
if (executors != null) {
return;
}
synchronized (OltpTraverser.class) {
if (executors != null) {
return;
}
int workers = this.graph()
.option(CoreOptions.OLTP_CONCURRENT_THREADS);
if (workers > 0) {
executors = new Consumers.ExecutorPool(EXECUTOR_NAME, workers);
}
}
}
@Override
public void close() {
// pass
}
public static void destroy() {
synchronized (OltpTraverser.class) {
if (executors != null) {
executors.destroy();
executors = null;
}
}
}
protected long traversePairs(Iterator<Pair<Id, Id>> pairs,
Consumer<Pair<Id, Id>> consumer) {
return this.traverseByOne(pairs, consumer, "traverse-pairs");
}
protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
boolean concurrent) {
if (concurrent) {
return this.traverseIds(ids, consumer);
} else {
long count = 0L;
while (ids.hasNext()) {
count++;
consumer.accept(ids.next());
}
return count;
}
}
protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer) {
return this.traverseByOne(ids, consumer, "traverse-ids");
}
protected <K> long traverseByOne(Iterator<K> iterator,
Consumer<K> consumer,
String taskName) {
if (!iterator.hasNext()) {
return 0L;
}
Consumers<K> consumers = new Consumers<>(executors.getExecutor(),
consumer, null);
consumers.start(taskName);
long total = 0L;
try {
while (iterator.hasNext()) {
total++;
K v = iterator.next();
consumers.provide(v);
}
} catch (Consumers.StopExecution e) {
// pass
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
try {
consumers.await();
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
executors.returnExecutor(consumers.executor());
CloseableIterator.closeIterator(iterator);
}
}
return total;
}
protected void traverseIdsByBfs(Iterator<Id> vertices,
Directions dir,
Id label,
long degree,
long capacity,
Consumer<EdgeId> consumer) {
List<Id> labels = label == null ? Collections.emptyList() :
Collections.singletonList(label);
OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity);
EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree);
// parallel out-of-order execution
this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step", 1);
}
protected void traverseIdsByBfs(Iterator<Id> vertices,
Steps steps,
long capacity,
Consumer<Edge> consumer) {
StepsEdgeIterConsumer edgeIterConsumer =
new StepsEdgeIterConsumer(consumer, capacity, steps);
EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices,
steps.direction(),
steps.edgeLabels(),
steps.degree());
// get Iterator<Iterator<edges>> from Iterator<Query>
EdgesIterator edgeIter = new EdgesIterator(queryIterator);
// parallel out-of-order execution
this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps", 1);
}
protected <K> long traverseByBatch(Iterator<Iterator<K>> sources,
Consumer<Iterator<K>> consumer,
String taskName, int concurrentWorkers) {
if (!sources.hasNext()) {
return 0L;
}
AtomicBoolean done = new AtomicBoolean(false);
Consumers<Iterator<K>> consumers = null;
try {
consumers = buildConsumers(consumer, concurrentWorkers, done,
executors.getExecutor());
return startConsumers(sources, taskName, done, consumers);
} finally {
assert consumers != null;
executors.returnExecutor(consumers.executor());
}
}
private <K> long startConsumers(Iterator<Iterator<K>> sources,
String taskName,
AtomicBoolean done,
Consumers<Iterator<K>> consumers) {
long total = 0L;
try {
consumers.start(taskName);
while (sources.hasNext() && !done.get()) {
total++;
Iterator<K> v = sources.next();
consumers.provide(v);
}
} catch (Consumers.StopExecution e) {
// pass
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
try {
consumers.await();
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
CloseableIterator.closeIterator(sources);
}
}
return total;
}
private <K> Consumers<Iterator<K>> buildConsumers(Consumer<Iterator<K>> consumer,
int queueSizePerWorker,
AtomicBoolean done,
ExecutorService executor) {
return new Consumers<>(executor,
consumer,
null,
e -> done.set(true),
queueSizePerWorker);
}
protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
String key, Object value) {
return new FilterIterator<>(vertices, vertex -> match(vertex, key, value));
}
protected boolean match(Element elem, String key, Object value) {
// check property key exists
this.graph().propertyKey(key);
// return true if property value exists & equals to specified value
Property<Object> p = elem.property(key);
return p.isPresent() && Objects.equal(p.value(), value);
}
public static class ConcurrentMultiValuedMap<K, V>
extends ConcurrentHashMap<K, List<V>> {
private static final long serialVersionUID = -7249946839643493614L;
public ConcurrentMultiValuedMap() {
super();
}
public void add(K key, V value) {
List<V> values = this.getValues(key);
values.add(value);
}
public void addAll(K key, List<V> value) {
List<V> values = this.getValues(key);
values.addAll(value);
}
public List<V> getValues(K key) {
List<V> values = this.get(key);
if (values == null) {
values = new CopyOnWriteArrayList<>();
List<V> old = this.putIfAbsent(key, values);
if (old != null) {
values = old;
}
}
return values;
}
}
public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> {
private final Id sourceV;
private final Set<Id> excluded;
private final Set<Id> neighbors;
private final long limit;
private final AtomicInteger count;
public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long limit,
Set<Id> neighbors) {
this.sourceV = sourceV;
this.excluded = excluded;
this.limit = limit;
this.neighbors = neighbors;
this.count = new AtomicInteger(0);
}
@Override
public void accept(EdgeId edgeId) {
if (this.limit != NO_LIMIT && count.get() >= this.limit) {
throw new Consumers.StopExecution("reach limit");
}
Id targetV = edgeId.otherVertexId();
if (this.sourceV.equals(targetV)) {
return;
}
if (this.excluded != null && this.excluded.contains(targetV)) {
return;
}
if (this.neighbors.add(targetV)) {
if (this.limit != NO_LIMIT) {
this.count.getAndIncrement();
}
}
}
}
public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>> {
private final Consumer<E> consumer;
private final long capacity;
public EdgesConsumer(Consumer<E> consumer, long capacity) {
this.consumer = consumer;
this.capacity = capacity;
}
protected abstract Iterator<E> prepare(Iterator<T> iter);
@Override
public void accept(Iterator<T> edgeIter) {
Iterator<E> ids = prepare(edgeIter);
long counter = 0;
while (ids.hasNext()) {
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Consumer is Interrupted");
break;
}
counter++;
this.consumer.accept(ids.next());
}
long total = edgeIterCounter.addAndGet(counter);
// traverse by batch & improve performance
if (this.capacity != NO_LIMIT && total >= this.capacity) {
throw new Consumers.StopExecution("reach capacity");
}
}
}
public class OneStepEdgeIterConsumer extends EdgesConsumer<Edge, EdgeId> {
public OneStepEdgeIterConsumer(Consumer<EdgeId> consumer, long capacity) {
super(consumer, capacity);
}
@Override
protected Iterator<EdgeId> prepare(Iterator<Edge> edgeIter) {
return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id());
}
}
public class StepsEdgeIterConsumer extends EdgesConsumer<Edge, Edge> {
private final Steps steps;
public StepsEdgeIterConsumer(Consumer<Edge> consumer, long capacity,
Steps steps) {
super(consumer, capacity);
this.steps = steps;
}
@Override
protected Iterator<Edge> prepare(Iterator<Edge> edgeIter) {
return edgesOfVertexStep(edgeIter, this.steps);
}
}
}