blob: 69af7682ce6648f73a1748c692b528ecf7d6f0ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
/**
* Implementation of {@link ClientCache} over TCP protocol.
*/
public class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "Keep binary" flag mask. */
private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
/** "Transactional" flag mask. */
private static final byte TRANSACTIONAL_FLAG_MASK = 0x02;
/** "With expiry policy" flag mask. */
private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;
/** Platform type: Java platform. */
static final byte JAVA_PLATFORM = 1;
/** Cache id. */
private final int cacheId;
/** Channel. */
private final ReliableChannel ch;
/** Cache name. */
private final String name;
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
/** Transactions facade. */
private final TcpClientTransactions transactions;
/** Serializer/deserializer. */
private final ClientUtils serDes;
/** Indicates if cache works with Ignite Binary format. */
private final boolean keepBinary;
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
/** Cache entry listeners registry. */
private final ClientCacheEntryListenersRegistry lsnrsRegistry;
/** JCache adapter. */
private final Cache<K, V> jCacheAdapter;
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
ClientCacheEntryListenersRegistry lsnrsRegistry) {
this(name, ch, marsh, transactions, lsnrsRegistry, false, null);
}
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary, ExpiryPolicy expiryPlc) {
this.name = name;
this.cacheId = ClientUtils.cacheId(name);
this.ch = ch;
this.marsh = marsh;
this.transactions = transactions;
this.lsnrsRegistry = lsnrsRegistry;
serDes = new ClientUtils(marsh);
this.keepBinary = keepBinary;
this.expiryPlc = expiryPlc;
jCacheAdapter = new ClientJCacheAdapter<>(this);
this.ch.registerCacheIfCustomAffinity(this.name);
}
/** {@inheritDoc} */
@Override public V get(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<V> getAsync(K key) {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_GET,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public void put(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
cacheSingleKeyOperation(
key,
ClientOperation.CACHE_PUT,
req -> writeObject(req, val),
null
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> putAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_PUT,
req -> writeObject(req, val),
null
);
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_CONTAINS_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> containsKeyAsync(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_CONTAINS_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean containsKeys(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return true;
return ch.service(
ClientOperation.CACHE_CONTAINS_KEYS,
req -> writeKeys(keys, req),
res -> res.in().readBoolean());
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> containsKeysAsync(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(true);
return ch.serviceAsync(
ClientOperation.CACHE_CONTAINS_KEYS,
req -> writeKeys(keys, req),
res -> res.in().readBoolean());
}
/** {@inheritDoc} */
@Override public String getName() {
return name;
}
/** {@inheritDoc} */
@Override public ClientCacheConfiguration getConfiguration() throws ClientException {
return ch.service(
ClientOperation.CACHE_GET_CONFIGURATION,
this::writeCacheInfo,
this::getClientCacheConfiguration
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<ClientCacheConfiguration> getConfigurationAsync() throws ClientException {
return ch.serviceAsync(
ClientOperation.CACHE_GET_CONFIGURATION,
this::writeCacheInfo,
this::getClientCacheConfiguration
);
}
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws ClientException {
return ch.service(
ClientOperation.CACHE_GET_SIZE,
req -> {
writeCacheInfo(req);
ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal()));
},
res -> (int)res.in().readLong()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws ClientException {
return ch.serviceAsync(
ClientOperation.CACHE_GET_SIZE,
req -> {
writeCacheInfo(req);
ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal()));
},
res -> (int)res.in().readLong()
);
}
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return new HashMap<>();
return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(new HashMap<>());
return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) throws ClientException {
if (map == null)
throw new NullPointerException("map");
if (map.isEmpty())
return;
ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws ClientException {
return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
if (newVal == null)
throw new NullPointerException("newVal");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REPLACE_IF_EQUALS,
req -> {
writeObject(req, oldVal);
writeObject(req, newVal);
},
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
if (newVal == null)
throw new NullPointerException("newVal");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_REPLACE_IF_EQUALS,
req -> {
writeObject(req, oldVal);
writeObject(req, newVal);
},
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REPLACE,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> replaceAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_REPLACE,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean remove(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REMOVE_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> removeAsync(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_REMOVE_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REMOVE_IF_EQUALS,
req -> writeObject(req, oldVal),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> removeAsync(K key, V oldVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_REMOVE_IF_EQUALS,
req -> writeObject(req, oldVal),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return;
ch.request(
ClientOperation.CACHE_REMOVE_KEYS,
req -> {
writeKeys(keys, req);
}
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> removeAllAsync(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(null);
return ch.requestAsync(
ClientOperation.CACHE_REMOVE_KEYS,
req -> {
writeKeys(keys, req);
}
);
}
/** {@inheritDoc} */
@Override public void removeAll() throws ClientException {
ch.request(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> removeAllAsync() throws ClientException {
return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_PUT,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<V> getAndPutAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_GET_AND_PUT,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public V getAndRemove(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_REMOVE,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<V> getAndRemoveAsync(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_GET_AND_REMOVE,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_REPLACE,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<V> getAndReplaceAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_GET_AND_REPLACE,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_PUT_IF_ABSENT,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Boolean> putIfAbsentAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_PUT_IF_ABSENT,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public V getAndPutIfAbsent(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_PUT_IF_ABSENT,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<V> getAndPutIfAbsentAsync(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_GET_AND_PUT_IF_ABSENT,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public void clear() throws ClientException {
ch.request(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> clearAsync() throws ClientException {
return ch.requestAsync(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public void clear(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
cacheSingleKeyOperation(
key,
ClientOperation.CACHE_CLEAR_KEY,
null,
null
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> clearAsync(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperationAsync(
key,
ClientOperation.CACHE_CLEAR_KEY,
null,
null
);
}
/** {@inheritDoc} */
@Override public void clearAll(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return;
ch.request(
ClientOperation.CACHE_CLEAR_KEYS,
req -> writeKeys(keys, req)
);
}
/** {@inheritDoc} */
@Override public IgniteClientFuture<Void> clearAllAsync(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return IgniteClientFutureImpl.completedFuture(null);
return ch.requestAsync(
ClientOperation.CACHE_CLEAR_KEYS,
req -> writeKeys(keys, req)
);
}
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
return keepBinary ? (ClientCache<K1, V1>)this :
new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, true, expiryPlc);
}
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc) {
return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, keepBinary, expirePlc);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <R> QueryCursor<R> query(Query<R> qry) {
if (qry == null)
throw new NullPointerException("qry");
QueryCursor<R> res;
if (qry instanceof ScanQuery)
res = scanQuery((ScanQuery)qry);
else if (qry instanceof SqlQuery)
res = (QueryCursor<R>)sqlQuery((SqlQuery)qry);
else if (qry instanceof SqlFieldsQuery)
res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
else if (qry instanceof ContinuousQuery)
res = query((ContinuousQuery<K, V>)qry, null);
else if (qry instanceof IndexQuery)
res = indexQuery((IndexQuery)qry);
else
throw new IllegalArgumentException(
String.format("Query of type [%s] is not supported", qry.getClass().getSimpleName())
);
return res;
}
/** {@inheritDoc} */
@Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
if (qry == null)
throw new NullPointerException("qry");
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
serDes.write(qry, payloadCh.out());
};
return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(
ch,
ClientOperation.QUERY_SQL_FIELDS,
ClientOperation.QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh
));
}
/** {@inheritDoc} */
@Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, ClientDisconnectListener disconnectLsnr) {
A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial query for continuous query " +
"can't be an instance of another continuous query");
A.notNull(qry.getLocalListener(), "Local listener");
A.ensure(!qry.isLocal(), "Local query is not supported by thin client");
A.ensure(qry.isAutoUnsubscribe(), "AutoUnsubscribe flag is not supported by thin client");
A.ensure(qry.getRemoteFilterFactory() == null || qry.getRemoteFilter() == null,
"RemoteFilter and RemoteFilterFactory can't be used together");
ClientCacheEntryListenerHandler<K, V> hnd = new ClientCacheEntryListenerHandler<>(
jCacheAdapter,
ch,
marsh,
keepBinary
);
hnd.startListen(
qry.getLocalListener(),
disconnectLsnr,
qry.getRemoteFilterFactory() != null ? qry.getRemoteFilterFactory() : qry.getRemoteFilter() != null ?
FactoryBuilder.factoryOf(qry.getRemoteFilter()) : null,
qry.getPageSize(),
qry.getTimeInterval(),
qry.isIncludeExpired()
);
if (qry.getInitialQuery() != null) {
try {
QueryCursor<R> cur = (QueryCursor<R>)query(qry.getInitialQuery());
return new ClientContinuousQueryCursor<>(cur, hnd);
}
catch (Exception e) {
U.closeQuiet(hnd);
throw e;
}
}
else
return new ClientContinuousQueryCursor<>(null, hnd);
}
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
registerCacheEntryListener(cfg, null);
}
/** {@inheritDoc} */
@Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg,
ClientDisconnectListener disconnectLsnr) {
A.ensure(!cfg.isSynchronous(),
"Unsupported cfg.isSynchronous() flag value");
A.notNull(cfg.getCacheEntryListenerFactory(), "cfg.getCacheEntryListenerFactory()");
ClientCacheEntryListenerHandler<K, V> hnd = new ClientCacheEntryListenerHandler<>(
jCacheAdapter,
ch,
marsh,
keepBinary
);
if (lsnrsRegistry.registerCacheEntryListener(name, cfg, hnd)) {
CacheEntryListener<? super K, ? super V> locLsnr = cfg.getCacheEntryListenerFactory().create();
ClientDisconnectListener disconnectLsnr0 = e -> {
if (disconnectLsnr != null)
disconnectLsnr.onDisconnected(e);
lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
};
hnd.startListen(
new ClientJCacheEntryListenerAdapter<>(locLsnr),
disconnectLsnr0,
cfg.getCacheEntryEventFilterFactory(),
ContinuousQuery.DFLT_PAGE_SIZE,
ContinuousQuery.DFLT_TIME_INTERVAL,
locLsnr instanceof CacheEntryExpiredListener
);
}
else
throw new IllegalStateException("Listener is already registered for configuration: " + cfg);
}
/** {@inheritDoc} */
@Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
ClientCacheEntryListenerHandler<?, ?> hnd = lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
U.closeQuiet(hnd);
}
/**
* Store DR data.
*
* @param drMap DR map.
*/
public void putAllConflict(Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> drMap) throws ClientException {
A.notNull(drMap, "drMap");
ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
}
/**
* Store DR data asynchronously.
*
* @param drMap DR map.
* @return Future.
*/
public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? extends V, GridCacheVersion>> drMap)
throws ClientException {
A.notNull(drMap, "drMap");
return ch.requestAsync(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
}
/**
* Removes DR data.
*
* @param drMap DR map.
*/
public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws ClientException {
A.notNull(drMap, "drMap");
ch.request(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
}
/**
* Removes DR data asynchronously.
*
* @param drMap DR map.
* @return Future.
*/
public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap)
throws ClientException {
A.notNull(drMap, "drMap");
return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
}
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
BinaryOutputStream out = payloadCh.out();
if (qry.getFilter() == null)
out.writeByte(GridBinaryMarshaller.NULL);
else {
serDes.writeObject(out, qry.getFilter());
out.writeByte(JAVA_PLATFORM);
}
out.writeInt(qry.getPageSize());
out.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());
out.writeBoolean(qry.isLocal());
};
return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_SCAN,
ClientOperation.QUERY_SCAN_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh,
cacheId,
qry.getPartition() == null ? -1 : qry.getPartition()
));
}
/** Handle index query. */
private QueryCursor<Cache.Entry<K, V>> indexQuery(IndexQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
BinaryOutputStream out = payloadCh.out();
try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), out, null, null)) {
w.writeInt(qry.getPageSize());
w.writeBoolean(qry.isLocal());
w.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());
w.writeString(qry.getValueType());
w.writeString(qry.getIndexName());
if (qry.getCriteria() != null) {
out.writeByte(ARR_LIST);
out.writeInt(qry.getCriteria().size());
for (IndexQueryCriterion c: qry.getCriteria()) {
if (c instanceof RangeIndexQueryCriterion) {
out.writeByte((byte)0); // Criterion type.
RangeIndexQueryCriterion range = (RangeIndexQueryCriterion)c;
w.writeString(range.field());
w.writeBoolean(range.lowerIncl());
w.writeBoolean(range.upperIncl());
w.writeBoolean(range.lowerNull());
w.writeBoolean(range.upperNull());
serDes.writeObject(out, range.lower());
serDes.writeObject(out, range.upper());
}
else {
throw new IllegalArgumentException(
String.format("Unknown IndexQuery criterion type [%s]", c.getClass().getSimpleName())
);
}
}
}
else
out.writeByte(GridBinaryMarshaller.NULL);
}
if (qry.getFilter() == null)
out.writeByte(GridBinaryMarshaller.NULL);
else {
serDes.writeObject(out, qry.getFilter());
out.writeByte(JAVA_PLATFORM);
}
};
return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_INDEX,
ClientOperation.QUERY_INDEX_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh,
cacheId,
qry.getPartition() == null ? -1 : qry.getPartition()
));
}
/** Handle SQL query. */
private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
BinaryOutputStream out = payloadCh.out();
serDes.writeObject(out, qry.getType());
serDes.writeObject(out, qry.getSql());
ClientUtils.collection(qry.getArgs(), out, serDes::writeObject);
out.writeBoolean(qry.isDistributedJoins());
out.writeBoolean(qry.isLocal());
out.writeBoolean(qry.isReplicatedOnly());
out.writeInt(qry.getPageSize());
out.writeLong(qry.getTimeout());
};
return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_SQL,
ClientOperation.QUERY_SQL_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh
));
}
/**
* Execute cache operation with a single key.
*/
private <T> T cacheSingleKeyOperation(
K key,
ClientOperation op,
Consumer<PayloadOutputChannel> additionalPayloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
Consumer<PayloadOutputChannel> payloadWriter = req -> {
writeCacheInfo(req);
writeObject(req, key);
if (additionalPayloadWriter != null)
additionalPayloadWriter.accept(req);
};
// Transactional operation cannot be executed on affinity node, it should be executed on node started
// the transaction.
return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) :
ch.service(op, payloadWriter, payloadReader);
}
/**
* Execute cache operation with a single key asynchronously.
*/
private <T> IgniteClientFuture<T> cacheSingleKeyOperationAsync(
K key,
ClientOperation op,
Consumer<PayloadOutputChannel> additionalPayloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
Consumer<PayloadOutputChannel> payloadWriter = req -> {
writeCacheInfo(req);
writeObject(req, key);
if (additionalPayloadWriter != null)
additionalPayloadWriter.accept(req);
};
// Transactional operation cannot be executed on affinity node, it should be executed on node started
// the transaction.
return transactions.tx() == null
? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader)
: ch.serviceAsync(op, payloadWriter, payloadReader);
}
/** Write cache ID and flags. */
private void writeCacheInfo(PayloadOutputChannel payloadCh) {
BinaryOutputStream out = payloadCh.out();
out.writeInt(cacheId);
byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0;
TcpClientTransaction tx = transactions.tx();
if (expiryPlc != null) {
ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx();
if (!protocolCtx.isFeatureSupported(EXPIRY_POLICY)) {
throw new ClientProtocolError(String.format("Expire policies are not supported by the server " +
"version %s, required version %s", protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
}
flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
}
if (tx != null) {
if (tx.clientChannel() != payloadCh.clientChannel()) {
throw new ClientException("Transaction context has been lost due to connection errors. " +
"Cache operations are prohibited until current transaction closed.");
}
flags |= TRANSACTIONAL_FLAG_MASK;
}
out.writeByte(flags);
if ((flags & WITH_EXPIRY_POLICY_FLAG_MASK) != 0) {
out.writeLong(convertDuration(expiryPlc.getExpiryForCreation()));
out.writeLong(convertDuration(expiryPlc.getExpiryForUpdate()));
out.writeLong(convertDuration(expiryPlc.getExpiryForAccess()));
}
if ((flags & TRANSACTIONAL_FLAG_MASK) != 0)
out.writeInt(tx.txId());
}
/** */
private <T> T readObject(BinaryInputStream in) {
return serDes.readObject(in, keepBinary);
}
/** */
private <T> T readObject(PayloadInputChannel payloadCh) {
return readObject(payloadCh.in());
}
/** */
private void writeObject(PayloadOutputChannel payloadCh, Object obj) {
serDes.writeObject(payloadCh.out(), obj);
}
/** */
@Nullable private ClientCacheConfiguration getClientCacheConfiguration(PayloadInputChannel res) {
try {
return serDes.cacheConfiguration(res.in(), res.clientChannel().protocolCtx());
}
catch (IOException e) {
return null;
}
}
/** */
private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req) {
writeCacheInfo(req);
ClientUtils.collection(keys, req.out(), serDes::writeObject);
}
/** */
private Map<K, V> readEntries(PayloadInputChannel res) {
BinaryInputStream in = res.in();
int cnt = in.readInt();
Map<K, V> map = new HashMap<>();
for (int i = 0; i < cnt; i++)
map.put(readObject(in), readObject(in));
return map;
}
/** */
private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req) {
writeCacheInfo(req);
ClientUtils.collection(
map.entrySet(),
req.out(),
(out, e) -> {
serDes.writeObject(out, e.getKey());
serDes.writeObject(out, e.getValue());
});
}
/** */
private void writePutAllConflict(
Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
PayloadOutputChannel req
) {
checkDataReplicationSupported(req.clientChannel().protocolCtx());
writeCacheInfo(req);
ClientUtils.collection(
map.entrySet(),
req.out(),
(out, e) -> {
serDes.writeObject(out, e.getKey());
serDes.writeObject(out, e.getValue().get1());
serDes.writeObject(out, e.getValue().get2());
});
}
/** */
private void writeRemoveAllConflict(Map<? extends K, GridCacheVersion> map, PayloadOutputChannel req) {
checkDataReplicationSupported(req.clientChannel().protocolCtx());
writeCacheInfo(req);
ClientUtils.collection(
map.entrySet(),
req.out(),
(out, e) -> {
serDes.writeObject(out, e.getKey());
serDes.writeObject(out, e.getValue());
});
}
/**
* Check that data replication operations is supported by server.
*
* @param protocolCtx Protocol context.
*/
private void checkDataReplicationSupported(ProtocolContext protocolCtx)
throws ClientFeatureNotSupportedByServerException {
if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
}
}