blob: a2a975499b31eda572e1cc82bb26237a78c76472 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.client.GridClientCacheFlag;
import org.apache.ignite.internal.client.GridClientClosedException;
import org.apache.ignite.internal.client.GridClientData;
import org.apache.ignite.internal.client.GridClientDataAffinity;
import org.apache.ignite.internal.client.GridClientDataMetrics;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFuture;
import org.apache.ignite.internal.client.GridClientFutureListener;
import org.apache.ignite.internal.client.GridClientNode;
import org.apache.ignite.internal.client.GridClientPredicate;
import org.apache.ignite.internal.client.balancer.GridClientLoadBalancer;
import org.apache.ignite.internal.client.impl.connection.GridClientConnection;
import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException;
import org.apache.ignite.internal.client.util.GridClientUtils;
import org.apache.ignite.internal.util.typedef.internal.A;
/**
* Data projection that serves one cache instance and handles communication errors.
*/
public class GridClientDataImpl extends GridClientAbstractProjection<GridClientDataImpl> implements GridClientData {
/** Cache metrics. */
private final boolean cacheMetrics;
/** Cache name. */
private String cacheName;
/** Client data metrics. */
private volatile GridClientDataMetrics metrics;
/** Cache flags to be enabled. */
private final Set<GridClientCacheFlag> flags;
/**
* Creates a data projection.
*
* @param cacheName Cache name for projection. If {@code null}, then default cache will be used.
* @param client Client instance to resolve connection failures.
* @param nodes Pinned nodes. If {@code null}, then no nodes will be pinned.
* @param filter Node filter. If {@code null}, then no filter would be applied to the node list.
* @param balancer Pinned node balancer. If {@code null}, then no balancer will be used.
* @param flags Cache flags to be enabled. If {@code null}, then no flags will be used.
* @param cacheMetrics Whether to cache received metrics.
*/
GridClientDataImpl(String cacheName, GridClientImpl client, Collection<GridClientNode> nodes,
GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer,
Set<GridClientCacheFlag> flags, boolean cacheMetrics) {
super(client, nodes, filter, balancer);
this.cacheName = cacheName;
this.cacheMetrics = cacheMetrics;
this.flags = flags == null ? Collections.<GridClientCacheFlag>emptySet() : Collections.unmodifiableSet(flags);
}
/** {@inheritDoc} */
@Override public String cacheName() {
return cacheName;
}
/** {@inheritDoc} */
@Override public GridClientData pinNodes(GridClientNode node, GridClientNode... nodes) throws GridClientException {
Collection<GridClientNode> pinnedNodes = new ArrayList<>(nodes != null ? nodes.length + 1 : 1);
if (node != null)
pinnedNodes.add(node);
if (nodes != null && nodes.length != 0)
pinnedNodes.addAll(Arrays.asList(nodes));
return createProjection(pinnedNodes.isEmpty() ? null : pinnedNodes,
null, null, new GridClientDataFactory(flags));
}
/** {@inheritDoc} */
@Override public Collection<GridClientNode> pinnedNodes() {
return nodes;
}
/** {@inheritDoc} */
@Override public <K, V> boolean put(K key, V val) throws GridClientException {
return putAsync(key, val).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Boolean> putAsync(final K key, final V val) {
A.notNull(key, "key");
A.notNull(val, "val");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cachePut(cacheName, key, val, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> void putAll(Map<K, V> entries) throws GridClientException {
putAllAsync(entries).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<?> putAllAsync(final Map<K, V> entries) {
A.notNull(entries, "entries");
if (entries.isEmpty())
return new GridClientFutureAdapter<>(false);
K key = GridClientUtils.first(entries.keySet());
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cachePutAll(cacheName, entries, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> V get(K key) throws GridClientException {
return this.<K, V>getAsync(key).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<V> getAsync(final K key) {
A.notNull(key, "key");
return withReconnectHandling(new ClientProjectionClosure<V>() {
@Override public GridClientFuture<V> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheGet(cacheName, key, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> Map<K, V> getAll(Collection<K> keys) throws GridClientException {
return this.<K, V>getAllAsync(keys).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Map<K, V>> getAllAsync(final Collection<K> keys) {
A.notNull(keys, "keys");
if (keys.isEmpty())
return new GridClientFutureAdapter<>(Collections.<K, V>emptyMap());
K key = GridClientUtils.first(keys);
return withReconnectHandling(new ClientProjectionClosure<Map<K, V>>() {
@Override public GridClientFuture<Map<K, V>> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheGetAll(cacheName, keys, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K> boolean remove(K key) throws GridClientException {
return removeAsync(key).get();
}
/** {@inheritDoc} */
@Override public <K> GridClientFuture<Boolean> removeAsync(final K key) {
A.notNull(key, "key");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheRemove(cacheName, key, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K> void removeAll(Collection<K> keys) throws GridClientException {
removeAllAsync(keys).get();
}
/** {@inheritDoc} */
@Override public <K> GridClientFuture<?> removeAllAsync(final Collection<K> keys) {
A.notNull(keys, "keys");
if (keys.isEmpty())
return new GridClientFutureAdapter<>(false);
K key = GridClientUtils.first(keys);
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheRemoveAll(cacheName, keys, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> boolean replace(K key, V val) throws GridClientException {
return replaceAsync(key, val).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Boolean> replaceAsync(final K key, final V val) {
A.notNull(key, "key");
A.notNull(val, "val");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheReplace(cacheName, key, val, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> boolean cas(K key, V val1, V val2) throws GridClientException {
return casAsync(key, val1, val2).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Boolean> casAsync(final K key, final V val1, final V val2) {
A.notNull(key, "key");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheCompareAndSet(cacheName, key, val1, val2, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <K> UUID affinity(K key) throws GridClientException {
A.notNull(key, "key");
GridClientDataAffinity affinity = client.affinity(cacheName);
if (affinity == null)
return null;
Collection<? extends GridClientNode> prj = projectionNodes();
if (prj.isEmpty())
throw new GridClientException("Failed to get affinity node (projection node set for cache is empty): " +
cacheName());
GridClientNode node = affinity.node(key, prj);
assert node != null;
return node.nodeId();
}
/** {@inheritDoc} */
@Override public GridClientDataMetrics metrics() throws GridClientException {
return metricsAsync().get();
}
/** {@inheritDoc} */
@Override public GridClientFuture<GridClientDataMetrics> metricsAsync() {
GridClientFuture<GridClientDataMetrics> fut = withReconnectHandling(
new ClientProjectionClosure<GridClientDataMetrics>() {
@Override public GridClientFuture<GridClientDataMetrics> apply(
GridClientConnection conn, UUID affinityNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheMetrics(cacheName, affinityNodeId);
}
});
if (cacheMetrics)
fut.listen(new GridClientFutureListener<GridClientDataMetrics>() {
@Override public void onDone(GridClientFuture<GridClientDataMetrics> fut) {
try {
metrics = fut.get();
}
catch (GridClientException ignored) {
// It's just a cache, so ignore failures.
}
}
});
return fut;
}
/** {@inheritDoc} */
@Override public GridClientDataMetrics cachedMetrics() throws GridClientException {
return metrics;
}
/** {@inheritDoc} */
@Override public <K, V> boolean append(K key, V val) throws GridClientException {
return appendAsync(key, val).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Boolean> appendAsync(final K key, final V val) throws GridClientException {
A.notNull(key, "key");
A.notNull(val, "val");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn, UUID destNodeId)
throws GridClientConnectionResetException, GridClientClosedException {
return conn.cacheAppend(cacheName, key, val, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public <K, V> boolean prepend(K key, V val) throws GridClientException {
return prependAsync(key, val).get();
}
/** {@inheritDoc} */
@Override public <K, V> GridClientFuture<Boolean> prependAsync(final K key, final V val)
throws GridClientException {
A.notNull(key, "key");
A.notNull(val, "val");
return withReconnectHandling(new ClientProjectionClosure<Boolean>() {
@Override public GridClientFuture<Boolean> apply(GridClientConnection conn,
UUID destNodeId) throws GridClientConnectionResetException, GridClientClosedException {
return conn.cachePrepend(cacheName, key, val, flags, destNodeId);
}
}, cacheName, key);
}
/** {@inheritDoc} */
@Override public Set<GridClientCacheFlag> flags() {
return flags;
}
/** {@inheritDoc} */
@Override public GridClientData flagsOn(GridClientCacheFlag... flags) throws GridClientException {
if (flags == null || flags.length == 0)
return this;
EnumSet<GridClientCacheFlag> flagSet = this.flags == null || this.flags.isEmpty() ?
EnumSet.noneOf(GridClientCacheFlag.class) : EnumSet.copyOf(this.flags);
flagSet.addAll(Arrays.asList(flags));
return createProjection(nodes, filter, balancer, new GridClientDataFactory(flagSet));
}
/** {@inheritDoc} */
@Override public GridClientData flagsOff(GridClientCacheFlag... flags) throws GridClientException {
if (flags == null || flags.length == 0 || this.flags == null || this.flags.isEmpty())
return this;
EnumSet<GridClientCacheFlag> flagSet = EnumSet.copyOf(this.flags);
flagSet.removeAll(Arrays.asList(flags));
return createProjection(nodes, filter, balancer, new GridClientDataFactory(flagSet));
}
/** {@inheritDoc} */
private class GridClientDataFactory implements ProjectionFactory<GridClientDataImpl> {
/** */
private Set<GridClientCacheFlag> flags;
/**
* Factory which creates projections with given flags.
*
* @param flags Flags to create projection with.
*/
GridClientDataFactory(Set<GridClientCacheFlag> flags) {
this.flags = flags;
}
/** {@inheritDoc} */
@Override public GridClientDataImpl create(Collection<GridClientNode> nodes,
GridClientPredicate<? super GridClientNode> filter, GridClientLoadBalancer balancer) {
return new GridClientDataImpl(cacheName, client, nodes, filter, balancer, flags, cacheMetrics);
}
}
}