blob: a103f6af5e50a762ea3834bd13bad1fb8cba9838 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.remote.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnectionException;
import org.apache.tinkerpop.gremlin.process.remote.traversal.RemoteTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Sends a {@link Traversal} to a {@link RemoteConnection} and iterates back the results.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class RemoteStep<S, E> extends AbstractStep<S, E> {
private transient RemoteConnection remoteConnection;
private RemoteTraversal<?, E> remoteTraversal;
private final AtomicReference<CompletableFuture<Traversal<?, E>>> traversalFuture = new AtomicReference<>(null);
public RemoteStep(final Traversal.Admin traversal, final RemoteConnection remoteConnection) {
super(traversal);
this.remoteConnection = remoteConnection;
}
@Override
public String toString() {
return StringFactory.stepString(this, this.remoteConnection);
}
@Override
protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
if (null == this.remoteTraversal) {
try {
promise().join();
} catch (CompletionException e) {
Throwable cause = e.getCause();
// If the underlying future failed, join() will throw a CompletionException, for consistency
// with previous behavior:
// - Throw underlying exception if it was unchecked (RuntimeException or Error).
// - Wrap in IllegalStateException otherwise.
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else if (cause instanceof Error) {
throw (Error) cause;
}
throw new IllegalStateException(cause);
}
}
return this.remoteTraversal.nextTraverser();
}
/**
* Submits the traversal asynchronously to a "remote" using {@link RemoteConnection#submitAsync(Bytecode)}.
*/
public CompletableFuture<Traversal<?, E>> promise() {
try {
if (null == traversalFuture.get()) {
traversalFuture.set(this.remoteConnection.submitAsync(this.traversal.getBytecode()).<Traversal<?, E>>thenApply(t -> {
this.remoteTraversal = (RemoteTraversal<?, E>) t;
this.traversal.setSideEffects(this.remoteTraversal.getSideEffects());
return traversal;
}));
}
return traversalFuture.get();
} catch (RemoteConnectionException rce) {
throw new IllegalStateException(rce);
}
}
}