blob: 213d5f1ecaf5c1683a7fc46d477977f16160951c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.NioGremlinResponseDecoder;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder;
import org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinResponseDecoder;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import org.apache.tinkerpop.gremlin.driver.simple.WebSocketClient;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Client-side channel initializer interface. It is responsible for constructing the Netty {@code ChannelPipeline}
* used by the client to connect and send message to Gremlin Server.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public interface Channelizer extends ChannelHandler {
/**
* Initializes the {@code Channelizer}. Called just after construction.
*/
public void init(final Connection connection);
/**
* Called on {@link Connection#close()} to perform an {@code Channelizer} specific functions. Note that the
* {@link Connection} already calls {@code Channel.close()} so there is no need to call that method here.
* An implementation will typically use this method to send a {@code Channelizer} specific message to the
* server to notify of shutdown coming from the client side (e.g. a "close" websocket frame).
*/
public void close(final Channel channel);
/**
* Create a message for the driver to use as a "keep-alive" for the connection. This method will only be used if
* {@link #supportsKeepAlive()} is {@code true}.
*/
public default Object createKeepAliveMessage() {
return null;
}
/**
* Determines if the channelizer supports a method for keeping the connection to the server alive.
*/
public default boolean supportsKeepAlive() {
return false;
}
/**
* Called after the channel connects. The {@code Channelizer} may need to perform some functions, such as a
* handshake.
*/
public default void connected() {
}
/**
* Base implementation of the client side {@link Channelizer}.
*/
abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> implements Channelizer {
protected Connection connection;
protected Cluster cluster;
private ConcurrentMap<UUID, ResultQueue> pending;
protected static final String PIPELINE_GREMLIN_SASL_HANDLER = "gremlin-sasl-handler";
protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
public boolean supportsSsl() {
return cluster.connectionPoolSettings().enableSsl;
}
public abstract void configure(final ChannelPipeline pipeline);
public void finalize(final ChannelPipeline pipeline) {
// do nothing
}
@Override
public void close(final Channel channel) {
// do nothing
}
@Override
public void init(final Connection connection) {
this.connection = connection;
this.cluster = connection.getCluster();
this.pending = connection.getPending();
}
@Override
protected void initChannel(final SocketChannel socketChannel) throws Exception {
final ChannelPipeline pipeline = socketChannel.pipeline();
final Optional<SslContext> sslCtx;
if (supportsSsl()) {
try {
sslCtx = Optional.of(cluster.createSSLContext());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else {
sslCtx = Optional.empty();
}
if (sslCtx.isPresent()) {
pipeline.addLast(sslCtx.get().newHandler(socketChannel.alloc(), connection.getUri().getHost(), connection.getUri().getPort()));
}
configure(pipeline);
pipeline.addLast(PIPELINE_GREMLIN_SASL_HANDLER, new Handler.GremlinSaslAuthenticationHandler(cluster.authProperties()));
pipeline.addLast(PIPELINE_GREMLIN_HANDLER, new Handler.GremlinResponseHandler(pending));
}
}
/**
* WebSocket {@link Channelizer} implementation.
*/
public final class WebSocketChannelizer extends AbstractChannelizer {
private WebSocketClientHandler handler;
private WebSocketGremlinRequestEncoder webSocketGremlinRequestEncoder;
private WebSocketGremlinResponseDecoder webSocketGremlinResponseDecoder;
@Override
public void init(final Connection connection) {
super.init(connection);
webSocketGremlinRequestEncoder = new WebSocketGremlinRequestEncoder(true, cluster.getSerializer());
webSocketGremlinResponseDecoder = new WebSocketGremlinResponseDecoder(cluster.getSerializer());
}
/**
* Keep-alive is supported through the ping/pong websocket protocol.
*
* @see <a href=https://tools.ietf.org/html/rfc6455#section-5.5.2>IETF RFC 6455</a>
*/
@Override
public boolean supportsKeepAlive() {
return true;
}
@Override
public Object createKeepAliveMessage() {
return new PingWebSocketFrame();
}
/**
* Sends a {@code CloseWebSocketFrame} to the server for the specified channel.
*/
@Override
public void close(final Channel channel) {
if (channel.isOpen()) channel.writeAndFlush(new CloseWebSocketFrame());
}
@Override
public boolean supportsSsl() {
final String scheme = connection.getUri().getScheme();
return "wss".equalsIgnoreCase(scheme);
}
@Override
public void configure(final ChannelPipeline pipeline) {
final String scheme = connection.getUri().getScheme();
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme))
throw new IllegalStateException("Unsupported scheme (only ws: or wss: supported): " + scheme);
if (!supportsSsl() && "wss".equalsIgnoreCase(scheme))
throw new IllegalStateException("To use wss scheme ensure that enableSsl is set to true in configuration");
final int maxContentLength = cluster.connectionPoolSettings().maxContentLength;
handler = new WebSocketClientHandler(
WebSocketClientHandshakerFactory.newHandshaker(
connection.getUri(), WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, maxContentLength));
pipeline.addLast("http-codec", new HttpClientCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
pipeline.addLast("ws-handler", handler);
pipeline.addLast("gremlin-encoder", webSocketGremlinRequestEncoder);
pipeline.addLast("gremlin-decoder", webSocketGremlinResponseDecoder);
}
@Override
public void connected() {
try {
// block for a few seconds - if the handshake takes longer than there's gotta be issues with that
// server. more than likely, SSL is enabled on the server, but the client forgot to enable it or
// perhaps the server is not configured for websockets.
handler.handshakeFuture().get(15000, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
throw new RuntimeException(new ConnectionException(connection.getUri(),
"Could not complete websocket handshake - ensure that client protocol matches server", ex));
}
}
}
/**
* NIO {@link Channelizer} implementation.
*
* @deprecated As of release 3.3.10, not replaced, use {@link WebSocketClient}.
*/
@Deprecated
public final class NioChannelizer extends AbstractChannelizer {
@Override
public void init(final Connection connection) {
super.init(connection);
}
@Override
public void configure(ChannelPipeline pipeline) {
pipeline.addLast("gremlin-decoder", new NioGremlinResponseDecoder(cluster.getSerializer()));
pipeline.addLast("gremlin-encoder", new NioGremlinRequestEncoder(true, cluster.getSerializer()));
}
}
}