blob: efc09b0fb9b7dfb2910503ee20627d3c123bb39b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.service.Service;
import org.apache.tinkerpop.gremlin.structure.service.ServiceRegistry;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.gremlin.util.function.TriFunction;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
* TinkerGraph services are currently just "toy" services, used to demonstrate and to test.
* @author Mike Personick (
public class TinkerServiceRegistry extends ServiceRegistry {
private final TinkerGraph graph;
public TinkerServiceRegistry(final TinkerGraph graph) {
this.graph = graph;
public TinkerServiceFactory registerService(final TinkerServiceFactory service) {
return service;
public <I, R> LambdaServiceFactory<I, R> registerLambdaService(final String name) {
return (LambdaServiceFactory) registerService(new LambdaServiceFactory(graph, name));
public abstract static class TinkerServiceFactory<I, R> implements ServiceFactory<I, R> {
protected final TinkerGraph graph;
protected final String name;
protected final Map describeParams = new LinkedHashMap();
protected final Map<Type,Set<TraverserRequirement>> requirements = new LinkedHashMap<>();
protected TinkerServiceFactory(final TinkerGraph graph, final String name) {
this.graph = graph; = name;
public String getName() {
return name;
public TinkerServiceFactory addDescribeParams(final Map describeParams) {
return this;
public TinkerServiceFactory addRequirements(final Type type, final TraverserRequirement... requirements) {
final Set<TraverserRequirement> typeRequirements = this.requirements.computeIfAbsent(type, x -> new LinkedHashSet<>());
for (TraverserRequirement requirement : requirements) typeRequirements.add(requirement);
return this;
public Set<TraverserRequirement> getRequirements(final Type type) {
return requirements.getOrDefault(type, Collections.emptySet());
public Map describeParams() {
return describeParams;
public abstract static class TinkerService<I, R> implements Service<I, R> {
protected final TinkerServiceFactory<I, R> serviceFactory;
protected TinkerService(final TinkerServiceFactory<I, R> serviceFactory) {
this.serviceFactory = serviceFactory;
public TinkerService addRequirements(final TraverserRequirement... requirements) {
serviceFactory.addRequirements(getType(), requirements);
return this;
public Set<TraverserRequirement> getRequirements() {
return serviceFactory.getRequirements(getType());
public static class LambdaServiceFactory<I, R> extends TinkerServiceFactory<I, R> {
public interface Options {
* Dynamic configuration of execution type.
String TYPE = "Type";
Type DEFAULT_TYPE = Type.Streaming;
* Dynamic configuration of chunk size for barriers.
String CHUNK_SIZE = "ChunkSize";
private Map<Type, Service<I, R>> lambdas = new LinkedHashMap<>();
public LambdaServiceFactory(final TinkerGraph graph, final String name) {
super(graph, name);
public Set<Type> getSupportedTypes() {
return lambdas.keySet();
public LambdaServiceFactory addDescribeParams(final Map describeParams) {
return (LambdaServiceFactory) super.addDescribeParams(describeParams);
public LambdaStartService addStartLambda(final BiFunction<ServiceCallContext, Map, Iterator<R>> lambda) {
final LambdaStartService<I, R> service = new LambdaStartService<>(this, lambda);
lambdas.put(Type.Start, service);
return service;
public LambdaStreamingService addStreamingLambda(final TriFunction<ServiceCallContext, Traverser.Admin<I>, Map, Iterator<R>> lambda) {
final LambdaStreamingService<I, R> service = new LambdaStreamingService<>(this, lambda);
lambdas.put(Type.Streaming, service);
return service;
public LambdaBarrierService addBarrierLambda(final TriFunction<ServiceCallContext, TraverserSet<I>, Map, Iterator<R>> lambda) {
final LambdaBarrierService<I, R> service = new LambdaBarrierService<>(this, lambda);
lambdas.put(Type.Barrier, service);
return service;
public LambdaBarrierService addBarrierLambda(final TriFunction<ServiceCallContext, TraverserSet<I>, Map, Iterator<R>> lambda, final int maxChunkSize) {
final LambdaBarrierService<I, R> service = new LambdaBarrierService<>(this, lambda, maxChunkSize);
lambdas.put(Type.Barrier, service);
return service;
public Service<I, R> createService(final boolean isStart, final Map params) {
if (isStart) {
if (supports(Type.Start)) {
return getService(Type.Start, params);
} else {
throw new UnsupportedOperationException(Service.Exceptions.cannotStartTraversal);
} else {
if (supports(Type.Streaming, Type.Barrier)) {
* This service supports both barrier and streaming execution. Look to the parameters for guidance.
final Type type = (Type) params.getOrDefault(Options.TYPE, Options.DEFAULT_TYPE);
return getService(type, params);
} else if (supports(Type.Streaming)) {
return getService(Type.Streaming, params);
} else if (supports(Type.Barrier)) {
return getService(Type.Barrier, params);
} else {
throw new UnsupportedOperationException(Service.Exceptions.cannotUseMidTraversal);
private Service<I, R> getService(final Type type, final Map params) {
if (type == Type.Streaming || type == Type.Start) {
return lambdas.get(type);
} else {
final LambdaBarrierService service = (LambdaBarrierService) lambdas.get(Type.Barrier);
// check for dynamic chunk size
if (params.containsKey(Options.CHUNK_SIZE))
return service.clone((int) params.get(Options.CHUNK_SIZE));
return service;
* Does this service support all of the specified types.
private boolean supports(final Type... types) {
for (Type type : types) {
if (!lambdas.containsKey(type)) {
return false;
return true;
public static class LambdaStartService<I, R> extends TinkerService<I, R> {
private final BiFunction<ServiceCallContext, Map, Iterator<R>> lambda;
public LambdaStartService(final LambdaServiceFactory<I, R> factory,
final BiFunction<ServiceCallContext, Map, Iterator<R>> lambda) {
this.lambda = lambda;
public Type getType() {
return Type.Start;
public CloseableIterator<R> execute(final ServiceCallContext ctx, final Map params) {
return CloseableIterator.of(lambda.apply(ctx, params));
public static class LambdaStreamingService<I, R> extends TinkerService<I, R> {
private final TriFunction<ServiceCallContext, Traverser.Admin<I>, Map, Iterator<R>> lambda;
public LambdaStreamingService(final LambdaServiceFactory<I, R> factory,
final TriFunction<ServiceCallContext, Traverser.Admin<I>, Map, Iterator<R>> lambda) {
this.lambda = lambda;
public Type getType() {
return Type.Streaming;
public CloseableIterator<R> execute(final ServiceCallContext ctx, final Traverser.Admin<I> in, final Map params) {
final Object result = lambda.apply(ctx, in, params);
return CloseableIterator.of(result instanceof Iterator ? (Iterator<R>) result : IteratorUtils.of((R) result));
public static class LambdaBarrierService<I, R> extends TinkerService<I, R> {
private final TriFunction<ServiceCallContext, TraverserSet<I>, Map, Iterator<R>> lambda;
private final int maxChunkSize;
public LambdaBarrierService(final LambdaServiceFactory<I, R> factory,
final TriFunction<ServiceCallContext, TraverserSet<I>, Map, Iterator<R>> lambda) {
this(factory, lambda, Integer.MAX_VALUE);
public LambdaBarrierService(final LambdaServiceFactory<I, R> factory,
final TriFunction<ServiceCallContext, TraverserSet<I>, Map, Iterator<R>> lambda,
final int maxChunkSize) {
this.lambda = lambda;
this.maxChunkSize = maxChunkSize;
public LambdaBarrierService clone(final int maxChunkSize) {
return new LambdaBarrierService((LambdaServiceFactory) serviceFactory, lambda, maxChunkSize);
public Type getType() {
return Type.Barrier;
public int getMaxBarrierSize() {
return maxChunkSize;
public CloseableIterator<R> execute(final ServiceCallContext ctx, final TraverserSet<I> in, final Map params) {
final Object result = lambda.apply(ctx, in, params);
return CloseableIterator.of(result instanceof Iterator ? (Iterator<R>) result : IteratorUtils.of((R) result));