blob: b41b531d822fdeea6e2c9d022850f51e7309c22a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.impl;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.client.GridClient;
import org.apache.ignite.internal.client.GridClientBeforeNodeStart;
import org.apache.ignite.internal.client.GridClientCacheMode;
import org.apache.ignite.internal.client.GridClientClosedException;
import org.apache.ignite.internal.client.GridClientClusterState;
import org.apache.ignite.internal.client.GridClientCompute;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientData;
import org.apache.ignite.internal.client.GridClientDataAffinity;
import org.apache.ignite.internal.client.GridClientDataConfiguration;
import org.apache.ignite.internal.client.GridClientDisconnectedException;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFactory;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.client.GridClientNodeStateBeforeStart;
import org.apache.ignite.internal.client.GridClientPartitionAffinity;
import org.apache.ignite.internal.client.GridClientPredicate;
import org.apache.ignite.internal.client.GridClientTopologyListener;
import org.apache.ignite.internal.client.GridServerUnreachableException;
import org.apache.ignite.internal.client.balancer.GridClientLoadBalancer;
import org.apache.ignite.internal.client.balancer.GridClientRandomBalancer;
import org.apache.ignite.internal.client.impl.connection.GridClientConnection;
import org.apache.ignite.internal.client.impl.connection.GridClientConnectionManager;
import org.apache.ignite.internal.client.impl.connection.GridClientConnectionManagerOsImpl;
import org.apache.ignite.internal.client.impl.connection.GridClientTopology;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.CycleThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
/**
* Client implementation.
*/
public class GridClientImpl implements GridClient, GridClientBeforeNodeStart {
/** Null mask object. */
private static final Object NULL_MASK = new Object();
/** Logger. */
private static final Logger log = Logger.getLogger(GridClientImpl.class.getName());
/* Suppression logging if needed. */
static {
if (!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_GRID_CLIENT_LOG_ENABLED, false))
log.setLevel(Level.OFF);
}
/** Client ID. */
private final UUID id;
/** Client configuration. */
protected final GridClientConfiguration cfg;
/** SSL context if ssl enabled. */
private final SSLContext sslCtx;
/** Main compute projection. */
@Nullable private final GridClientComputeImpl compute;
/** Cluster state projection. */
@Nullable private final GridClientClusterStateImpl clusterState;
/** Data projections. */
private final ConcurrentMap<Object, GridClientDataImpl> dataMap = new ConcurrentHashMap<>();
/** Topology. */
protected final GridClientTopology top;
/** Topology updater thread. */
@Nullable private final Thread topUpdateThread;
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
/** Connection manager. */
protected final GridClientConnectionManager connMgr;
/** Routers. */
private final Collection<InetSocketAddress> routers;
/** Servers. */
private final Collection<InetSocketAddress> srvs;
/** Projection of node state before its start. */
@Nullable private final GridClientNodeStateBeforeStart beforeStartState;
/**
* Creates a new client based on a given configuration.
* <p/>
* If {@code beforeNodeStart == true}, topology will not be received/updated,
* and there will also be errors when trying to work with topology, compute, state and cache.
*
* @param id Client identifier.
* @param cfg0 Client configuration.
* @param routerClient Router client flag.
* @param beforeNodeStart Connecting to a node before it start.
* @throws GridClientException If client configuration is incorrect.
* @throws GridServerUnreachableException If none of the servers specified in configuration can be reached.
*/
@SuppressWarnings("CallToThreadStartDuringObjectConstruction")
public GridClientImpl(
UUID id,
GridClientConfiguration cfg0,
boolean routerClient,
boolean beforeNodeStart
) throws GridClientException {
this.id = id;
cfg = new GridClientConfiguration(cfg0);
boolean success = false;
try {
top = new GridClientTopology(cfg);
if (!beforeNodeStart) {
for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) {
GridClientDataAffinity aff = dataCfg.getAffinity();
if (aff instanceof GridClientTopologyListener)
addTopologyListener((GridClientTopologyListener)aff);
}
}
if (!beforeNodeStart && cfg.getBalancer() instanceof GridClientTopologyListener)
top.addTopologyListener((GridClientTopologyListener)cfg.getBalancer());
GridSslContextFactory factory = cfg.getSslContextFactory();
if (factory != null) {
try {
sslCtx = factory.createSslContext();
}
catch (SSLException e) {
throw new GridClientException("Failed to create client (unable to create SSL context, " +
"check ssl context factory configuration): " + e.getMessage(), e);
}
}
else
sslCtx = null;
if (cfg.isAutoFetchMetrics() && !cfg.isEnableMetricsCache())
log.warning("Auto-fetch for metrics is enabled without enabling caching for them.");
if (cfg.isAutoFetchAttributes() && !cfg.isEnableAttributesCache())
log.warning(
"Auto-fetch for node attributes is enabled without enabling caching for them.");
srvs = parseAddresses(cfg.getServers());
routers = parseAddresses(cfg.getRouters());
if (srvs.isEmpty() && routers.isEmpty())
throw new GridClientException("Servers addresses and routers addresses cannot both be empty " +
"for client (please fix configuration and restart): " + this);
if (!srvs.isEmpty() && !routers.isEmpty())
throw new GridClientException("Servers addresses and routers addresses cannot both be provided " +
"for client (please fix configuration and restart): " + this);
connMgr = createConnectionManager(id, sslCtx, cfg, routers, top, null, routerClient, beforeNodeStart);
try {
// Init connection manager.
tryInit();
}
catch (GridClientException e) {
top.fail(e);
log.warning("Failed to initialize topology on client start. Will retry in background.");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new GridClientException("Client startup was interrupted.", e);
}
if (!beforeNodeStart) {
beforeStartState = null;
topUpdateThread = new TopologyUpdaterThread();
topUpdateThread.setDaemon(true);
topUpdateThread.start();
compute = new GridClientComputeImpl(this, null, null, cfg.getBalancer());
clusterState = new GridClientClusterStateImpl(this, null, null, cfg.getBalancer());
}
else {
topUpdateThread = null;
compute = null;
clusterState = null;
beforeStartState = new GridClientNodeStateBeforeStartImpl(this);
}
if (log.isLoggable(Level.INFO))
log.info("Client started [id=" + id + ", protocol=" + cfg.getProtocol() + ']');
success = true;
}
finally {
if (!success)
stop(false);
}
}
/** {@inheritDoc} */
@Override public UUID id() {
return id;
}
/**
* Closes client.
* @param waitCompletion If {@code true} will wait for all pending requests to be proceeded.
*/
public void stop(boolean waitCompletion) {
if (closed.compareAndSet(false, true)) {
// Shutdown the topology refresh thread.
if (topUpdateThread != null)
topUpdateThread.interrupt();
// Shutdown listener notification.
if (top != null)
top.shutdown();
if (connMgr != null)
connMgr.stop(waitCompletion);
for (GridClientDataConfiguration dataCfg : cfg.getDataConfigurations()) {
GridClientDataAffinity aff = dataCfg.getAffinity();
if (aff instanceof GridClientTopologyListener)
removeTopologyListener((GridClientTopologyListener)aff);
}
if (log.isLoggable(Level.INFO))
log.info("Client stopped [id=" + id + ", waitCompletion=" + waitCompletion + ']');
}
}
/** {@inheritDoc} */
@Override public GridClientData data(@Nullable final String cacheName) throws GridClientException {
checkClosed();
checkBeforeNodeStartMode();
Object key = maskNull(cacheName);
GridClientDataImpl data = dataMap.get(key);
if (data == null) {
GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName);
if (dataCfg == null && cacheName != null)
throw new GridClientException("Data configuration for given cache name was not provided: " +
cacheName);
GridClientLoadBalancer balancer = dataCfg != null ? dataCfg.getPinnedBalancer() :
new GridClientRandomBalancer();
GridClientPredicate<GridClientNode> cacheNodes = new GridClientPredicate<GridClientNode>() {
@Override public boolean apply(GridClientNode e) {
return e.caches().containsKey(cacheName);
}
@Override public String toString() {
return "GridClientHasCacheFilter [cacheName=" + cacheName + "]";
}
};
data = new GridClientDataImpl(
cacheName, this, null, cacheNodes, balancer, null, cfg.isEnableMetricsCache());
GridClientDataImpl old = dataMap.putIfAbsent(key, data);
if (old != null)
data = old;
}
return data;
}
/** {@inheritDoc} */
@Override public GridClientCompute compute() {
checkBeforeNodeStartMode();
return compute;
}
/** {@inheritDoc} */
@Override public GridClientClusterState state() {
checkBeforeNodeStartMode();
return clusterState;
}
/** {@inheritDoc} */
@Override public void addTopologyListener(GridClientTopologyListener lsnr) {
checkBeforeNodeStartMode();
top.addTopologyListener(lsnr);
}
/** {@inheritDoc} */
@Override public void removeTopologyListener(GridClientTopologyListener lsnr) {
checkBeforeNodeStartMode();
top.removeTopologyListener(lsnr);
}
/** {@inheritDoc} */
@Override public Collection<GridClientTopologyListener> topologyListeners() {
checkBeforeNodeStartMode();
return top.topologyListeners();
}
/** {@inheritDoc} */
@Override public boolean connected() {
return !top.failed();
}
/** {@inheritDoc} */
@Override public void close() {
GridClientFactory.stop(id);
}
/**
* Gets topology instance.
*
* @return Topology instance.
*/
public GridClientTopology topology() {
checkBeforeNodeStartMode();
return top;
}
/** {@inheritDoc} */
@Override public GridClientException checkLastError() {
return top.lastError();
}
/** {@inheritDoc} */
@Override @Nullable public GridClientNodeStateBeforeStart beforeStartState() {
return beforeStartState;
}
/**
* @return Connection manager.
*/
public GridClientConnectionManager connectionManager() {
return connMgr;
}
/**
* Gets data affinity for a given cache name.
*
* @param cacheName Name of cache for which affinity is obtained. Data configuration with this name
* must be configured at client startup.
* @return Data affinity object.
* @throws IllegalArgumentException If client data with given name was not configured.
*/
GridClientDataAffinity affinity(String cacheName) {
GridClientDataConfiguration dataCfg = cfg.getDataConfiguration(cacheName);
return dataCfg == null ? null : dataCfg.getAffinity();
}
/**
* Checks and throws an exception if this client was closed.
*
* @throws GridClientClosedException If client was closed.
*/
private void checkClosed() throws GridClientClosedException {
if (closed.get())
throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore).");
}
/**
* Checks and throws an exception if mode is "before node start".
*
* @throws IgniteException If mode is "before node start".
*/
private void checkBeforeNodeStartMode() throws IgniteException {
if (beforeStartState != null)
throw new IgniteException("It is possible to work with a node only before it starts.");
}
/**
* Masks null cache name with unique object.
*
* @param cacheName Name to be masked.
* @return Original name or some unique object if name is null.
*/
private Object maskNull(String cacheName) {
return cacheName == null ? NULL_MASK : cacheName;
}
/**
* Maps Collection of strings to collection of {@code InetSocketAddress}es.
*
* @param cfgAddrs Collection fo string representations of addresses.
* @return Collection of {@code InetSocketAddress}es
* @throws GridClientException In case of error.
*/
private static Collection<InetSocketAddress> parseAddresses(Collection<String> cfgAddrs)
throws GridClientException {
Collection<InetSocketAddress> addrs = new ArrayList<>(cfgAddrs.size());
for (String srvStr : cfgAddrs) {
try {
String[] split = srvStr.split(":");
InetSocketAddress addr = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
addrs.add(addr);
}
catch (RuntimeException e) {
throw new GridClientException("Failed to create client (invalid server address specified): " +
srvStr, e);
}
}
return Collections.unmodifiableCollection(addrs);
}
/**
* @return New connection manager based on current client settings.
* @throws GridClientException If failed to start connection server.
*/
public GridClientConnectionManager newConnectionManager(
@Nullable Byte marshId,
boolean routerClient
) throws GridClientException {
return createConnectionManager(id, sslCtx, cfg, routers, top, marshId, routerClient, beforeStartState != null);
}
/**
* @param clientId Client ID.
* @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one.
* @param cfg Client configuration.
* @param routers Routers or empty collection to use endpoints from topology info.
* @param top Topology.
* @param beforeNodeStart Connecting to a node before starting it without getting/updating topology.
* @throws GridClientException In case of error.
*/
private GridClientConnectionManager createConnectionManager(UUID clientId, SSLContext sslCtx,
GridClientConfiguration cfg, Collection<InetSocketAddress> routers, GridClientTopology top,
@Nullable Byte marshId, boolean routerClient, boolean beforeNodeStart) throws GridClientException {
return new GridClientConnectionManagerOsImpl(
clientId,
sslCtx,
cfg,
routers,
top,
marshId,
routerClient,
beforeNodeStart
);
}
/**
* Tries to init connection manager using configured set of servers or routers.
*
* @throws GridClientException If initialisation failed.
* @throws InterruptedException If initialisation was interrupted.
*/
private void tryInit() throws GridClientException, InterruptedException {
connMgr.init(addresses());
Map<String, GridClientCacheMode> overallCaches = new HashMap<>();
for (GridClientNodeImpl node : top.nodes())
overallCaches.putAll(node.caches());
for (Map.Entry<String, GridClientCacheMode> entry : overallCaches.entrySet()) {
GridClientDataAffinity affinity = affinity(entry.getKey());
if (affinity instanceof GridClientPartitionAffinity && entry.getValue() !=
GridClientCacheMode.PARTITIONED)
log.warning(GridClientPartitionAffinity.class.getSimpleName() + " is used for a cache configured " +
"for non-partitioned mode [cacheName=" + entry.getKey() + ", cacheMode=" + entry.getValue() + ']');
}
}
/**
* Getting a client connection without topology information.
*
* @return Client connection.
* @throws GridClientException If failed.
*/
public GridClientConnection connection() throws GridClientException, InterruptedException {
return connectionManager().connection(addresses());
}
/**
* Return addresses for connection.
*
* @return Addresses for connection.
* @throws GridClientException If failed.
*/
private Collection<InetSocketAddress> addresses() throws GridClientException {
boolean hasSrvs = routers.isEmpty();
final Collection<InetSocketAddress> connSrvs = (hasSrvs) ? new LinkedHashSet<>(srvs) : routers;
if (hasSrvs) {
// Add REST endpoints for all nodes from previous topology snapshot.
try {
for (GridClientNodeImpl node : top.nodes()) {
Collection<InetSocketAddress> endpoints = node.availableAddresses(cfg.getProtocol(), true);
List<InetSocketAddress> resolvedEndpoints = new ArrayList<>(endpoints.size());
for (InetSocketAddress endpoint : endpoints)
if (!endpoint.isUnresolved())
resolvedEndpoints.add(endpoint);
boolean sameHost = node.attributes().isEmpty() ||
F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", "));
if (sameHost) {
Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true));
connSrvs.addAll(resolvedEndpoints);
}
else {
for (InetSocketAddress endpoint : resolvedEndpoints)
if (!endpoint.getAddress().isLoopbackAddress())
connSrvs.add(endpoint);
}
}
}
catch (GridClientDisconnectedException ignored) {
// Ignore if latest topology update failed.
}
}
return connSrvs;
}
/**
* Thread that updates topology according to refresh interval specified in configuration.
*/
private class TopologyUpdaterThread extends CycleThread {
/**
* Creates topology refresh thread.
*/
private TopologyUpdaterThread() {
super(id + "-topology-update", cfg.getTopologyRefreshFrequency());
}
/** {@inheritDoc} */
@Override public void iteration() throws InterruptedException {
try {
tryInit();
}
catch (GridClientException e) {
top.fail(e);
if (log.isLoggable(Level.FINE))
log.fine("Failed to update topology: " + e.getMessage());
}
}
}
/** {@inheritDoc} */
@Override public String toString() {
return "GridClientImpl [id=" + id + ", closed=" + closed + ']';
}
}