| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.tinkerpop.gremlin.driver; |
| |
| import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; |
| import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; |
| import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; |
| import org.apache.tinkerpop.gremlin.structure.Graph; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| |
| /** |
| * A {@code Client} is constructed from a {@link Cluster} and represents a way to send messages to Gremlin Server. |
| * This class itself is a base class as there are different implementations that provide differing kinds of |
| * functionality. See the implementations for specifics on their individual usage. |
| * <p/> |
| * The {@code Client} is designed to be re-used and shared across threads. |
| * |
| * @author Stephen Mallette (http://stephen.genoprime.com) |
| */ |
| public abstract class Client { |
| |
| private static final Logger logger = LoggerFactory.getLogger(Client.class); |
| |
| protected final Cluster cluster; |
| protected volatile boolean initialized; |
| |
| Client(final Cluster cluster) { |
| this.cluster = cluster; |
| } |
| |
| /** |
| * Makes any final changes to the builder and returns the constructed {@link RequestMessage}. Implementers |
| * may choose to override this message to append data to the request before sending. By default, this method |
| * will simply return the {@code builder} passed in by the caller. |
| */ |
| public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) { |
| return builder; |
| } |
| |
| /** |
| * Called in the {@link #init} method. |
| */ |
| protected abstract void initializeImplementation(); |
| |
| /** |
| * Chooses a {@link Connection} to write the message to. |
| */ |
| protected abstract Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException; |
| |
| /** |
| * Asynchronous close of the {@code Client}. |
| */ |
| public abstract CompletableFuture<Void> closeAsync(); |
| |
| /** |
| * Create a new {@code Client} that aliases the specified {@link Graph} or {@link TraversalSource} name on the |
| * server to a variable called "g" for the context of the requests made through that {@code Client}. |
| * |
| * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" |
| * @deprecated As of release 3.1.0, replaced by {@link #alias(String)} |
| */ |
| @Deprecated |
| public Client rebind(final String graphOrTraversalSource) { |
| return alias(graphOrTraversalSource); |
| } |
| |
| /** |
| * Create a new {@code Client} that aliases the specified {@link Graph} or {@link TraversalSource} name on the |
| * server to a variable called "g" for the context of the requests made through that {@code Client}. |
| * |
| * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" |
| */ |
| public Client alias(String graphOrTraversalSource) { |
| return new AliasClusteredClient(this, graphOrTraversalSource); |
| } |
| |
| /** |
| * Creates a {@code Client} that supplies the specified set of aliases, thus allowing the user to re-name |
| * one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of |
| * the created {@code Client}. |
| */ |
| @Deprecated |
| public Client rebind(final Map<String,String> rebindings) { |
| return alias(rebindings); |
| } |
| |
| /** |
| * Creates a {@code Client} that supplies the specified set of aliases, thus allowing the user to re-name |
| * one or more globally defined {@link Graph} or {@link TraversalSource} server bindings for the context of |
| * the created {@code Client}. |
| */ |
| public Client alias(final Map<String,String> aliases) { |
| return new AliasClusteredClient(this, aliases); |
| } |
| |
| /** |
| * Initializes the client which typically means that a connection is established to the server. Depending on the |
| * implementation and configuration this blocking call may take some time. This method will be called |
| * automatically if it is not called directly and multiple calls will not have effect. |
| */ |
| public synchronized Client init() { |
| if (initialized) |
| return this; |
| |
| logger.debug("Initializing client on cluster [{}]", cluster); |
| |
| cluster.init(); |
| initializeImplementation(); |
| |
| initialized = true; |
| return this; |
| } |
| |
| /** |
| * Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is |
| * complete. |
| * |
| * @param gremlin the gremlin script to execute |
| */ |
| public ResultSet submit(final String gremlin) { |
| return submit(gremlin, null); |
| } |
| |
| /** |
| * Submits a Gremlin script and bound parameters to the server and returns a {@link ResultSet} once the write of |
| * the request is complete. If a script is to be executed repeatedly with slightly different arguments, prefer |
| * this method to concatenating a Gremlin script from dynamically produced strings and sending it to |
| * {@link #submit(String)}. Parameterized scripts will perform better. |
| * |
| * @param gremlin the gremlin script to execute |
| * @param parameters a map of parameters that will be bound to the script on execution |
| */ |
| public ResultSet submit(final String gremlin, final Map<String, Object> parameters) { |
| try { |
| return submitAsync(gremlin, parameters).get(); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * The asynchronous version of {@link #submit(String)} where the returned future will complete when the |
| * write of the request completes. |
| * |
| * @param gremlin the gremlin script to execute |
| */ |
| public CompletableFuture<ResultSet> submitAsync(final String gremlin) { |
| return submitAsync(gremlin, null); |
| } |
| |
| /** |
| * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the |
| * write of the request completes. |
| * |
| * @param gremlin the gremlin script to execute |
| * @param parameters a map of parameters that will be bound to the script on execution |
| */ |
| public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String, Object> parameters) { |
| final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) |
| .add(Tokens.ARGS_GREMLIN, gremlin) |
| .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); |
| |
| Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); |
| |
| return submitAsync(buildMessage(request).create()); |
| } |
| |
| /** |
| * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the |
| * write of the request completes. |
| * |
| * @param gremlin the gremlin script to execute |
| * @param parameters a map of parameters that will be bound to the script on execution |
| * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" |
| */ |
| public CompletableFuture<ResultSet> submitAsync(final String gremlin, final String graphOrTraversalSource, |
| final Map<String, Object> parameters) { |
| final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) |
| .add(Tokens.ARGS_GREMLIN, gremlin) |
| .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); |
| |
| Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); |
| |
| if (graphOrTraversalSource != null && !graphOrTraversalSource.isEmpty()) |
| request.addArg(Tokens.ARGS_ALIASES, makeAliases(graphOrTraversalSource)); |
| |
| return submitAsync(buildMessage(request).create()); |
| } |
| |
| /** |
| * The asynchronous version of {@link #submit(String, Map)}} where the returned future will complete when the |
| * write of the request completes. |
| * |
| * @param gremlin the gremlin script to execute |
| * @param parameters a map of parameters that will be bound to the script on execution |
| * @param aliases aliases the specified global Gremlin Server variable some other name that then be used in the |
| * script where the key is the alias name and the value represents the global variable on the |
| * server |
| */ |
| public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Map<String,String> aliases, |
| final Map<String, Object> parameters) { |
| final RequestMessage.Builder request = RequestMessage.build(Tokens.OPS_EVAL) |
| .add(Tokens.ARGS_GREMLIN, gremlin) |
| .add(Tokens.ARGS_BATCH_SIZE, cluster.connectionPoolSettings().resultIterationBatchSize); |
| |
| Optional.ofNullable(parameters).ifPresent(params -> request.addArg(Tokens.ARGS_BINDINGS, parameters)); |
| |
| if (aliases != null && !aliases.isEmpty()) |
| request.addArg(Tokens.ARGS_ALIASES, aliases); |
| |
| return submitAsync(buildMessage(request).create()); |
| } |
| |
| /** |
| * A low-level method that allows the submission of a manually constructed {@link RequestMessage}. |
| */ |
| public CompletableFuture<ResultSet> submitAsync(final RequestMessage msg) { |
| if (!initialized) |
| init(); |
| |
| final CompletableFuture<ResultSet> future = new CompletableFuture<>(); |
| Connection connection = null; |
| try { |
| // the connection is returned to the pool once the response has been completed...see Connection.write() |
| // the connection may be returned to the pool with the host being marked as "unavailable" |
| connection = chooseConnection(msg); |
| connection.write(msg, future); |
| return future; |
| } catch (TimeoutException toe) { |
| // there was a timeout borrowing a connection |
| throw new RuntimeException(toe); |
| } catch (ConnectionException ce) { |
| throw new RuntimeException(ce); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } finally { |
| if (logger.isDebugEnabled()) |
| logger.debug("Submitted {} to - {}", msg, null == connection ? "connection not initialized" : connection.toString()); |
| } |
| } |
| |
| /** |
| * Closes the client by making a synchronous call to {@link #closeAsync()}. |
| */ |
| public void close() { |
| closeAsync().join(); |
| } |
| |
| private Map<String,String> makeAliases(final String graphOrTraversalSource) { |
| final Map<String,String> aliases = new HashMap<>(); |
| aliases.put("g", graphOrTraversalSource); |
| return aliases; |
| } |
| |
| /** |
| * A {@code Client} implementation that does not operate in a session. Requests are sent to multiple servers |
| * given a {@link LoadBalancingStrategy}. Transactions are automatically committed |
| * (or rolled-back on error) after each request. |
| */ |
| public final static class ClusteredClient extends Client { |
| |
| private ConcurrentMap<Host, ConnectionPool> hostConnectionPools = new ConcurrentHashMap<>(); |
| |
| ClusteredClient(final Cluster cluster) { |
| super(cluster); |
| } |
| |
| /** |
| * Submits a Gremlin script to the server and returns a {@link ResultSet} once the write of the request is |
| * complete. |
| * |
| * @param gremlin the gremlin script to execute |
| */ |
| public ResultSet submit(final String gremlin, final String graphOrTraversalSource) { |
| return submit(gremlin, graphOrTraversalSource, null); |
| } |
| |
| /** |
| * Submits a Gremlin script and bound parameters to the server and returns a {@link ResultSet} once the write of |
| * the request is complete. If a script is to be executed repeatedly with slightly different arguments, prefer |
| * this method to concatenating a Gremlin script from dynamically produced strings and sending it to |
| * {@link #submit(String)}. Parameterized scripts will perform better. |
| * |
| * @param gremlin the gremlin script to execute |
| * @param parameters a map of parameters that will be bound to the script on execution |
| * @param graphOrTraversalSource rebinds the specified global Gremlin Server variable to "g" |
| */ |
| public ResultSet submit(final String gremlin, final String graphOrTraversalSource, final Map<String, Object> parameters) { |
| try { |
| return submitAsync(gremlin, graphOrTraversalSource, parameters).get(); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| /** |
| * Uses a {@link LoadBalancingStrategy} to choose the best {@link Host} and then selects the best connection |
| * from that host's connection pool. |
| */ |
| @Override |
| protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { |
| final Iterator<Host> possibleHosts = this.cluster.loadBalancingStrategy().select(msg); |
| |
| // you can get no possible hosts in more than a few situations. perhaps the servers are just all down. |
| // or perhaps the client is not configured properly (disables ssl when ssl is enabled on the server). |
| if (!possibleHosts.hasNext()) |
| throw new TimeoutException("Timed out while waiting for an available host - check the client configuration and connectivity to the server if this message persists"); |
| |
| final Host bestHost = possibleHosts.next(); |
| final ConnectionPool pool = hostConnectionPools.get(bestHost); |
| return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * Initializes the connection pools on all hosts. |
| */ |
| @Override |
| protected void initializeImplementation() { |
| cluster.allHosts().forEach(host -> { |
| try { |
| // hosts that don't initialize connection pools will come up as a dead host |
| hostConnectionPools.put(host, new ConnectionPool(host, this)); |
| |
| // added a new host to the cluster so let the load-balancer know |
| this.cluster.loadBalancingStrategy().onNew(host); |
| } catch (Exception ex) { |
| // catch connection errors and prevent them from failing the creation |
| logger.warn("Could not initialize connection pool for {} - will try later", host); |
| } |
| }); |
| } |
| |
| /** |
| * Closes all the connection pools on all hosts. |
| */ |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| final CompletableFuture[] poolCloseFutures = new CompletableFuture[hostConnectionPools.size()]; |
| hostConnectionPools.values().stream().map(ConnectionPool::closeAsync).collect(Collectors.toList()).toArray(poolCloseFutures); |
| return CompletableFuture.allOf(poolCloseFutures); |
| } |
| } |
| |
| /** |
| * Uses a {@link org.apache.tinkerpop.gremlin.driver.Client.ClusteredClient} that rebinds requests to a |
| * specified {@link Graph} or {@link TraversalSource} instances on the server-side. |
| */ |
| public final static class AliasClusteredClient extends ReboundClusteredClient { |
| public AliasClusteredClient(Client clusteredClient, String graphOrTraversalSource) { |
| super(clusteredClient, graphOrTraversalSource); |
| } |
| |
| public AliasClusteredClient(Client clusteredClient, Map<String, String> rebindings) { |
| super(clusteredClient, rebindings); |
| } |
| } |
| |
| /** |
| * Uses a {@link org.apache.tinkerpop.gremlin.driver.Client.ClusteredClient} that rebinds requests to a |
| * specified {@link Graph} or {@link TraversalSource} instances on the server-side. |
| * |
| * @deprecated As of release 3.1.1-incubating, replaced by {@link AliasClusteredClient}. |
| */ |
| @Deprecated |
| public static class ReboundClusteredClient extends Client { |
| private final Client client; |
| private final Map<String,String> aliases = new HashMap<>(); |
| final CompletableFuture<Void> close = new CompletableFuture<>(); |
| |
| ReboundClusteredClient(final Client client, final String graphOrTraversalSource) { |
| super(client.cluster); |
| this.client = client; |
| aliases.put("g", graphOrTraversalSource); |
| } |
| |
| ReboundClusteredClient(final Client client, final Map<String,String> rebindings) { |
| super(client.cluster); |
| this.client = client; |
| this.aliases.putAll(rebindings); |
| } |
| |
| @Override |
| public synchronized Client init() { |
| if (close.isDone()) throw new IllegalStateException("Client is closed"); |
| |
| // the underlying client may not have been init'd |
| client.init(); |
| |
| return this; |
| } |
| |
| @Override |
| public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) { |
| if (close.isDone()) throw new IllegalStateException("Client is closed"); |
| if (!aliases.isEmpty()) |
| builder.addArg(Tokens.ARGS_ALIASES, aliases); |
| |
| return client.buildMessage(builder); |
| } |
| |
| @Override |
| protected void initializeImplementation() { |
| // no init required |
| if (close.isDone()) throw new IllegalStateException("Client is closed"); |
| } |
| |
| /** |
| * Delegates to the underlying {@link org.apache.tinkerpop.gremlin.driver.Client.ClusteredClient}. |
| */ |
| @Override |
| protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { |
| if (close.isDone()) throw new IllegalStateException("Client is closed"); |
| return client.chooseConnection(msg); |
| } |
| |
| /** |
| * Prevents messages from being sent from this {@code Client}. Note that calling this method does not call |
| * close on the {@code Client} that created it. |
| */ |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| close.complete(null); |
| return close; |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| @Deprecated |
| public Client rebind(final String graphOrTraversalSource) { |
| return alias(graphOrTraversalSource); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public Client alias(String graphOrTraversalSource) { |
| if (close.isDone()) throw new IllegalStateException("Client is closed"); |
| return new AliasClusteredClient(client, graphOrTraversalSource); |
| } |
| } |
| |
| /** |
| * A {@code Client} implementation that operates in the context of a session. Requests are sent to a single |
| * server, where each request is bound to the same thread with the same set of bindings across requests. |
| * Transaction are not automatically committed. It is up the client to issue commit/rollback commands. |
| */ |
| public final static class SessionedClient extends Client { |
| private final String sessionId; |
| private final boolean manageTransactions; |
| |
| private ConnectionPool connectionPool; |
| |
| SessionedClient(final Cluster cluster, final String sessionId, final boolean manageTransactions) { |
| super(cluster); |
| this.sessionId = sessionId; |
| this.manageTransactions = manageTransactions; |
| } |
| |
| String getSessionId() { |
| return sessionId; |
| } |
| |
| /** |
| * Adds the {@link Tokens#ARGS_SESSION} value to every {@link RequestMessage}. |
| */ |
| @Override |
| public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) { |
| builder.processor("session"); |
| builder.addArg(Tokens.ARGS_SESSION, sessionId); |
| builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions); |
| return builder; |
| } |
| |
| /** |
| * Since the session is bound to a single host, simply borrow a connection from that pool. |
| */ |
| @Override |
| protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException { |
| return connectionPool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * Randomly choose an available {@link Host} to bind the session too and initialize the {@link ConnectionPool}. |
| */ |
| @Override |
| protected void initializeImplementation() { |
| // chooses an available host at random |
| final List<Host> hosts = cluster.allHosts() |
| .stream().filter(Host::isAvailable).collect(Collectors.toList()); |
| Collections.shuffle(hosts); |
| final Host host = hosts.get(0); |
| connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1)); |
| } |
| |
| /** |
| * Close the bound {@link ConnectionPool}. |
| */ |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| return connectionPool.closeAsync(); |
| } |
| } |
| } |