blob: 7b1cd8b62cd2ba085fd921874c817ced3173801f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.router.impl;
import org.apache.ignite.*;
import org.apache.ignite.internal.client.*;
import org.apache.ignite.internal.client.router.*;
import org.apache.ignite.internal.client.ssl.*;
import org.apache.ignite.internal.processors.rest.client.message.*;
import org.apache.ignite.internal.util.nio.*;
import org.apache.ignite.internal.util.nio.ssl.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lifecycle.*;
import org.apache.ignite.logger.java.*;
import org.jetbrains.annotations.*;
import javax.management.*;
import javax.net.ssl.*;
import java.lang.management.*;
import java.lang.reflect.*;
import java.net.*;
import java.nio.*;
import java.util.*;
/**
* Wrapper class for router process.
*/
public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean, LifecycleAware {
/** */
private static final String ENT_NIO_LSNR_CLS = "org.apache.ignite.client.router.impl.GridTcpRouterNioListenerEntImpl";
/** Id. */
private final UUID id = UUID.randomUUID();
/** Configuration. */
private final GridTcpRouterConfiguration cfg;
/** Logger. */
private final IgniteLogger log;
/** Server. */
private GridNioServer<GridClientMessage> srv;
/** Client. */
private GridRouterClientImpl client;
/** MBean name. */
private ObjectName mbeanName;
/** NIO parser. */
private volatile GridTcpRouterNioParser parser;
/** Host where server was actually bound. */
private volatile String bindHost;
/** Port where server was actually bound. */
private volatile int bindPort;
/**
* Creates new router instance.
*
* @param cfg Router configuration.
*/
public GridTcpRouterImpl(GridTcpRouterConfiguration cfg) {
this.cfg = cfg;
log = cfg.getLogger() != null ?
cfg.getLogger().getLogger(getClass()) : new JavaLogger().getLogger(getClass());
}
/**
* Starts router.
*
* @throws IgniteException If failed.
*/
@Override public void start() throws IgniteException {
try {
client = createClient(cfg);
}
catch (GridClientException e) {
throw new IgniteException("Failed to initialise embedded client.", e);
}
GridNioServerListener<GridClientMessage> lsnr;
try {
Class<?> cls = Class.forName(ENT_NIO_LSNR_CLS);
Constructor<?> cons = cls.getDeclaredConstructor(IgniteLogger.class, GridRouterClientImpl.class);
cons.setAccessible(true);
lsnr = (GridNioServerListener<GridClientMessage>)cons.newInstance(log, client);
}
catch (ClassNotFoundException ignored) {
lsnr = new GridTcpRouterNioListenerOsImpl(log, client);
}
catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
throw new IgniteException("Failed to create NIO listener.", e);
}
parser = new GridTcpRouterNioParser();
final InetAddress hostAddr;
try {
hostAddr = InetAddress.getByName(cfg.getHost());
}
catch (UnknownHostException e) {
throw new IgniteException("Failed to resolve grid address for configured host: " + cfg.getHost(), e);
}
SSLContext sslCtx;
try {
GridSslContextFactory sslCtxFactory = cfg.getSslContextFactory();
sslCtx = sslCtxFactory == null ? null : sslCtxFactory.createSslContext();
}
catch (SSLException e) {
throw new IgniteException("Failed to create SSL context.", e);
}
for (int port = cfg.getPort(), last = port + cfg.getPortRange(); port <= last; port++) {
if (startTcpServer(hostAddr, port, lsnr, parser, cfg.isNoDelay(), sslCtx, cfg.isSslClientAuth(),
cfg.isSslClientAuth())) {
if (log.isInfoEnabled())
log.info("TCP router successfully started for endpoint: " + hostAddr.getHostAddress() + ":" + port);
bindPort = port;
bindHost = hostAddr.getHostName();
break;
}
else
U.warn(log, "TCP REST router failed to start on endpoint: " + hostAddr.getHostAddress() + ":" + port +
". Will try next port within allowed port range.");
}
if (bindPort == 0)
throw new IgniteException("Failed to bind TCP router server (possibly all ports in range " +
"are in use) [firstPort=" + cfg.getPort() + ", lastPort=" + (cfg.getPort() + cfg.getPortRange()) +
", addr=" + hostAddr + ']');
try {
ObjectName objName = U.registerMBean(
ManagementFactory.getPlatformMBeanServer(),
"Router",
"TCP Router " + id,
getClass().getSimpleName(),
this,
GridTcpRouterMBean.class);
if (log.isDebugEnabled())
log.debug("Registered MBean: " + objName);
mbeanName = objName;
}
catch (JMException e) {
U.error(log, "Failed to register MBean.", e);
}
}
/**
* Stops this router.
*/
@Override public void stop() {
if (srv != null)
srv.stop();
if (client != null)
client.stop(true);
if (mbeanName != null)
try {
ManagementFactory.getPlatformMBeanServer().unregisterMBean(mbeanName);
if (log.isDebugEnabled())
log.debug("Unregistered MBean: " + mbeanName);
}
catch (JMException e) {
U.error(log, "Failed to unregister MBean.", e);
}
if (log.isInfoEnabled())
log.info("TCP router successfully stopped.");
}
/**
* Tries to start server with given parameters.
*
* @param hostAddr Host on which server should be bound.
* @param port Port on which server should be bound.
* @param lsnr Server message listener.
* @param parser Server message parser.
* @param tcpNoDelay Flag indicating whether TCP_NODELAY flag should be set for accepted connections.
* @param sslCtx SSL context in case if SSL is enabled.
* @param wantClientAuth Whether client will be requested for authentication.
* @param needClientAuth Whether client is required to be authenticated.
* @return {@code True} if server successfully started, {@code false} if port is used and
* server was unable to start.
*/
private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr,
GridNioParser parser, boolean tcpNoDelay, @Nullable SSLContext sslCtx, boolean wantClientAuth,
boolean needClientAuth) {
try {
GridNioFilter codec = new GridNioCodecFilter(parser, log, false);
// This name is required to be unique in order to avoid collisions with
// ThreadWorkerGroups running in the same JVM by other routers/nodes.
String gridName = "router-" + id;
GridNioFilter[] filters;
if (sslCtx != null) {
GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, false, ByteOrder.nativeOrder(), log);
sslFilter.wantClientAuth(wantClientAuth);
sslFilter.needClientAuth(needClientAuth);
filters = new GridNioFilter[] { codec, sslFilter };
}
else
filters = new GridNioFilter[] { codec };
srv = GridNioServer.<GridClientMessage>builder()
.address(hostAddr)
.port(port)
.listener(lsnr)
.logger(log)
.selectorCount(Runtime.getRuntime().availableProcessors())
.gridName(gridName)
.tcpNoDelay(tcpNoDelay)
.directBuffer(false)
.byteOrder(ByteOrder.nativeOrder())
.socketSendBufferSize(0)
.socketReceiveBufferSize(0)
.sendQueueLimit(0)
.filters(filters)
.idleTimeout(cfg.getIdleTimeout())
.build();
srv.start();
return true;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to start TCP router protocol on port " + port + ": " + e.getMessage());
srv = null;
return false;
}
}
/**
* Creates a client for forwarding messages to the grid.
*
* @param routerCfg Router configuration.
* @return Client instance.
* @throws GridClientException If client creation failed.
*/
private GridRouterClientImpl createClient(GridTcpRouterConfiguration routerCfg) throws GridClientException {
UUID clientId = UUID.randomUUID();
return new GridRouterClientImpl(clientId, routerCfg);
}
/** {@inheritDoc} */
@Override public String getHost() {
return bindHost;
}
/** {@inheritDoc} */
@Override public int getPort() {
return bindPort;
}
/** {@inheritDoc} */
@Override public boolean isSslEnabled() {
return cfg.getSslContextFactory() != null;
}
/** {@inheritDoc} */
@Override public boolean isSslClientAuth() {
return cfg.isSslClientAuth();
}
/** {@inheritDoc} */
@Override public Collection<String> getServers() {
return cfg.getServers();
}
/** {@inheritDoc} */
@Override public UUID id() {
return id;
}
/** {@inheritDoc} */
@Override public GridTcpRouterConfiguration configuration() {
return cfg;
}
/** {@inheritDoc} */
@Override public long getReceivedCount() {
return parser != null ? parser.getReceivedCount() : 0;
}
/** {@inheritDoc} */
@Override public long getSendCount() {
return parser != null ? parser.getSendCount() : 0;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return id.hashCode();
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (o == this)
return true;
if (o == null || getClass() != o.getClass())
return false;
GridTcpRouterImpl that = (GridTcpRouterImpl)o;
return id.equals(that.id);
}
}