blob: 99f8661576bcdd03aaf23a12bd272537bcf61191 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.client.thin;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import static java.util.AbstractMap.SimpleEntry;
import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_6_0;
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;
/**
* Implementation of {@link ClientCache} over TCP protocol.
*/
class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "Keep binary" flag mask. */
private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
/** "Transactional" flag mask. */
private static final byte TRANSACTIONAL_FLAG_MASK = 0x02;
/** "With expiry policy" flag mask. */
private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;
/** Cache id. */
private final int cacheId;
/** Channel. */
private final ReliableChannel ch;
/** Cache name. */
private final String name;
/** Marshaller. */
private final ClientBinaryMarshaller marsh;
/** Transactions facade. */
private final TcpClientTransactions transactions;
/** Serializer/deserializer. */
private final ClientUtils serDes;
/** Indicates if cache works with Ignite Binary format. */
private final boolean keepBinary;
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions) {
this(name, ch, marsh, transactions, false, null);
}
/** Constructor. */
TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
boolean keepBinary, ExpiryPolicy expiryPlc) {
this.name = name;
this.cacheId = ClientUtils.cacheId(name);
this.ch = ch;
this.marsh = marsh;
this.transactions = transactions;
serDes = new ClientUtils(marsh);
this.keepBinary = keepBinary;
this.expiryPlc = expiryPlc;
}
/** {@inheritDoc} */
@Override public V get(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public void put(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
cacheSingleKeyOperation(
key,
ClientOperation.CACHE_PUT,
req -> writeObject(req, val),
null
);
}
/** {@inheritDoc} */
@Override public boolean containsKey(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_CONTAINS_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public String getName() {
return name;
}
/** {@inheritDoc} */
@Override public ClientCacheConfiguration getConfiguration() throws ClientException {
return ch.service(
ClientOperation.CACHE_GET_CONFIGURATION,
this::writeCacheInfo,
res -> {
try {
return serDes.cacheConfiguration(res.in(), res.clientChannel().serverVersion());
}
catch (IOException e) {
return null;
}
}
);
}
/** {@inheritDoc} */
@Override public int size(CachePeekMode... peekModes) throws ClientException {
return ch.service(
ClientOperation.CACHE_GET_SIZE,
req -> {
writeCacheInfo(req);
ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal()));
},
res -> (int)res.in().readLong()
);
}
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return new HashMap<>();
return ch.service(
ClientOperation.CACHE_GET_ALL,
req -> {
writeCacheInfo(req);
ClientUtils.collection(keys, req.out(), serDes::writeObject);
},
res -> ClientUtils.collection(
res.in(),
in -> new SimpleEntry<K, V>(readObject(in), readObject(in))
)
).stream().collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
}
/** {@inheritDoc} */
@Override public void putAll(Map<? extends K, ? extends V> map) throws ClientException {
if (map == null)
throw new NullPointerException("map");
if (map.isEmpty())
return;
ch.request(
ClientOperation.CACHE_PUT_ALL,
req -> {
writeCacheInfo(req);
ClientUtils.collection(
map.entrySet(),
req.out(),
(out, e) -> {
serDes.writeObject(out, e.getKey());
serDes.writeObject(out, e.getValue());
});
}
);
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
if (newVal == null)
throw new NullPointerException("newVal");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REPLACE_IF_EQUALS,
req -> {
writeObject(req, oldVal);
writeObject(req, newVal);
},
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REPLACE,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean remove(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REMOVE_KEY,
null,
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public boolean remove(K key, V oldVal) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (oldVal == null)
throw new NullPointerException("oldVal");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_REMOVE_IF_EQUALS,
req -> writeObject(req, oldVal),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public void removeAll(Set<? extends K> keys) throws ClientException {
if (keys == null)
throw new NullPointerException("keys");
if (keys.isEmpty())
return;
ch.request(
ClientOperation.CACHE_REMOVE_KEYS,
req -> {
writeCacheInfo(req);
ClientUtils.collection(keys, req.out(), serDes::writeObject);
}
);
}
/** {@inheritDoc} */
@Override public void removeAll() throws ClientException {
ch.request(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_PUT,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public V getAndRemove(K key) throws ClientException {
if (key == null)
throw new NullPointerException("key");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_REMOVE,
null,
this::readObject
);
}
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_GET_AND_REPLACE,
req -> writeObject(req, val),
this::readObject
);
}
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) throws ClientException {
if (key == null)
throw new NullPointerException("key");
if (val == null)
throw new NullPointerException("val");
return cacheSingleKeyOperation(
key,
ClientOperation.CACHE_PUT_IF_ABSENT,
req -> writeObject(req, val),
res -> res.in().readBoolean()
);
}
/** {@inheritDoc} */
@Override public void clear() throws ClientException {
ch.request(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
}
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
return keepBinary ? (ClientCache<K1, V1>)this :
new TcpClientCache<>(name, ch, marsh, transactions, true, expiryPlc);
}
/** {@inheritDoc} */
@Override public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc) {
return new TcpClientCache<>(name, ch, marsh, transactions, keepBinary, expirePlc);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public <R> QueryCursor<R> query(Query<R> qry) {
if (qry == null)
throw new NullPointerException("qry");
QueryCursor<R> res;
if (qry instanceof ScanQuery)
res = scanQuery((ScanQuery)qry);
else if (qry instanceof SqlQuery)
res = (QueryCursor<R>)sqlQuery((SqlQuery)qry);
else if (qry instanceof SqlFieldsQuery)
res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
else
throw new IllegalArgumentException(
String.format("Query of type [%s] is not supported", qry.getClass().getSimpleName())
);
return res;
}
/** {@inheritDoc} */
@Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
if (qry == null)
throw new NullPointerException("qry");
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
serDes.write(qry, payloadCh.out());
};
return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(
ch,
ClientOperation.QUERY_SQL_FIELDS,
ClientOperation.QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh
));
}
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
BinaryOutputStream out = payloadCh.out();
if (qry.getFilter() == null)
out.writeByte(GridBinaryMarshaller.NULL);
else {
serDes.writeObject(out, qry.getFilter());
out.writeByte((byte)1); // Java platform
}
out.writeInt(qry.getPageSize());
out.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());
out.writeBoolean(qry.isLocal());
};
return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_SCAN,
ClientOperation.QUERY_SCAN_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh
));
}
/** Handle SQL query. */
private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
writeCacheInfo(payloadCh);
BinaryOutputStream out = payloadCh.out();
serDes.writeObject(out, qry.getType());
serDes.writeObject(out, qry.getSql());
ClientUtils.collection(qry.getArgs(), out, serDes::writeObject);
out.writeBoolean(qry.isDistributedJoins());
out.writeBoolean(qry.isLocal());
out.writeBoolean(qry.isReplicatedOnly());
out.writeInt(qry.getPageSize());
out.writeLong(qry.getTimeout());
};
return new ClientQueryCursor<>(new ClientQueryPager<>(
ch,
ClientOperation.QUERY_SQL,
ClientOperation.QUERY_SQL_CURSOR_GET_PAGE,
qryWriter,
keepBinary,
marsh
));
}
/**
* Execute cache operation with a single key.
*/
private <T> T cacheSingleKeyOperation(
K key,
ClientOperation op,
Consumer<PayloadOutputChannel> additionalPayloadWriter,
Function<PayloadInputChannel, T> payloadReader
) throws ClientException {
Consumer<PayloadOutputChannel> payloadWriter = req -> {
writeCacheInfo(req);
writeObject(req, key);
if (additionalPayloadWriter != null)
additionalPayloadWriter.accept(req);
};
// Transactional operation cannot be executed on affinity node, it should be executed on node started
// the transaction.
return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) :
ch.service(op, payloadWriter, payloadReader);
}
/** Write cache ID and flags. */
private void writeCacheInfo(PayloadOutputChannel payloadCh) {
BinaryOutputStream out = payloadCh.out();
out.writeInt(cacheId);
byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0;
TcpClientTransaction tx = transactions.tx();
if (expiryPlc != null) {
if (payloadCh.clientChannel().serverVersion().compareTo(V1_6_0) < 0) {
throw new ClientProtocolError(String.format("Expire policies have not supported by the server " +
"version %s, required version %s", payloadCh.clientChannel().serverVersion(), V1_6_0));
}
flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
}
if (tx != null) {
if (tx.clientChannel() != payloadCh.clientChannel()) {
throw new ClientException("Transaction context has been lost due to connection errors. " +
"Cache operations are prohibited until current transaction closed.");
}
flags |= TRANSACTIONAL_FLAG_MASK;
}
out.writeByte(flags);
if ((flags & WITH_EXPIRY_POLICY_FLAG_MASK) != 0) {
out.writeLong(convertDuration(expiryPlc.getExpiryForCreation()));
out.writeLong(convertDuration(expiryPlc.getExpiryForUpdate()));
out.writeLong(convertDuration(expiryPlc.getExpiryForAccess()));
}
if ((flags & TRANSACTIONAL_FLAG_MASK) != 0)
out.writeInt(tx.txId());
}
/** */
private <T> T readObject(BinaryInputStream in) {
return serDes.readObject(in, keepBinary);
}
/** */
private <T> T readObject(PayloadInputChannel payloadCh) {
return readObject(payloadCh.in());
}
/** */
private void writeObject(PayloadOutputChannel payloadCh, Object obj) {
serDes.writeObject(payloadCh.out(), obj);
}
}