| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.platform.client; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.configuration.ThinClientConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.binary.BinaryReaderExImpl; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerAbstractConnectionContext; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; |
| import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; |
| import org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext; |
| import org.apache.ignite.internal.util.nio.GridNioSession; |
| |
| import static org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature.USER_ATTRIBUTES; |
| import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.AUTHORIZATION; |
| import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.BITMAP_FEATURES; |
| |
| /** |
| * Thin Client connection context. |
| */ |
| public class ClientConnectionContext extends ClientListenerAbstractConnectionContext { |
| /** Version 1.0.0. */ |
| public static final ClientListenerProtocolVersion VER_1_0_0 = ClientListenerProtocolVersion.create(1, 0, 0); |
| |
| /** Version 1.1.0. */ |
| public static final ClientListenerProtocolVersion VER_1_1_0 = ClientListenerProtocolVersion.create(1, 1, 0); |
| |
| /** Version 1.2.0. */ |
| public static final ClientListenerProtocolVersion VER_1_2_0 = ClientListenerProtocolVersion.create(1, 2, 0); |
| |
| /** Version 1.3.0. */ |
| public static final ClientListenerProtocolVersion VER_1_3_0 = ClientListenerProtocolVersion.create(1, 3, 0); |
| |
| /** Version 1.4.0. Added: Partition awareness, IEP-23. */ |
| public static final ClientListenerProtocolVersion VER_1_4_0 = ClientListenerProtocolVersion.create(1, 4, 0); |
| |
| /** Version 1.5.0. Added: Transactions support, IEP-34. */ |
| public static final ClientListenerProtocolVersion VER_1_5_0 = ClientListenerProtocolVersion.create(1, 5, 0); |
| |
| /** Version 1.6.0. Added: Expiration Policy configuration. */ |
| public static final ClientListenerProtocolVersion VER_1_6_0 = ClientListenerProtocolVersion.create(1, 6, 0); |
| |
| /** |
| * Version 1.7.0. Added: protocol features. |
| * ATTENTION! Do not add any new protocol versions unless totally necessary. Use {@link ClientBitmaskFeature} |
| * instead. |
| */ |
| public static final ClientListenerProtocolVersion VER_1_7_0 = ClientListenerProtocolVersion.create(1, 7, 0); |
| |
| /** Default version. */ |
| public static final ClientListenerProtocolVersion DEFAULT_VER = VER_1_7_0; |
| |
| /** Default protocol context. */ |
| public static final ClientProtocolContext DEFAULT_PROTOCOL_CONTEXT = |
| new ClientProtocolContext(DEFAULT_VER, ClientBitmaskFeature.allFeaturesAsEnumSet()); |
| |
| /** Supported versions. */ |
| private static final Collection<ClientListenerProtocolVersion> SUPPORTED_VERS = Arrays.asList( |
| VER_1_7_0, |
| VER_1_6_0, |
| VER_1_5_0, |
| VER_1_4_0, |
| VER_1_3_0, |
| VER_1_2_0, |
| VER_1_1_0, |
| VER_1_0_0 |
| ); |
| |
| /** Message parser. */ |
| private ClientMessageParser parser; |
| |
| /** Request handler. */ |
| private ClientRequestHandler handler; |
| |
| /** Handle registry. */ |
| private final ClientResourceRegistry resReg = new ClientResourceRegistry(); |
| |
| /** Max cursors. */ |
| private final int maxCursors; |
| |
| /** Current protocol context. */ |
| private ClientProtocolContext currentProtocolContext; |
| |
| /** Last reported affinity topology version. */ |
| private AtomicReference<AffinityTopologyVersion> lastAffinityTopologyVersion = new AtomicReference<>(); |
| |
| /** Client session. */ |
| private GridNioSession ses; |
| |
| /** Cursor counter. */ |
| private final AtomicLong curCnt = new AtomicLong(); |
| |
| /** Active tx count limit. */ |
| private final int maxActiveTxCnt; |
| |
| /** Tx id. */ |
| private final AtomicInteger txIdSeq = new AtomicInteger(); |
| |
| /** Transactions by transaction id. */ |
| private final Map<Integer, ClientTxContext> txs = new ConcurrentHashMap<>(); |
| |
| /** Active transactions count. */ |
| private final AtomicInteger txsCnt = new AtomicInteger(); |
| |
| /** Active compute tasks limit. */ |
| private final int maxActiveComputeTasks; |
| |
| /** Active compute tasks count. */ |
| private final AtomicInteger activeTasksCnt = new AtomicInteger(); |
| |
| /** |
| * Ctor. |
| * |
| * @param ctx Kernal context. |
| * @param connId Connection ID. |
| * @param maxCursors Max active cursors. |
| * @param thinCfg Thin-client configuration. |
| */ |
| public ClientConnectionContext(GridKernalContext ctx, GridNioSession ses, long connId, int maxCursors, ThinClientConfiguration thinCfg) { |
| super(ctx, ses, connId); |
| |
| this.maxCursors = maxCursors; |
| maxActiveTxCnt = thinCfg.getMaxActiveTxPerConnection(); |
| maxActiveComputeTasks = thinCfg.getMaxActiveComputeTasksPerConnection(); |
| } |
| |
| /** |
| * Gets the handle registry. |
| * |
| * @return Handle registry. |
| */ |
| public ClientResourceRegistry resources() { |
| return resReg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isVersionSupported(ClientListenerProtocolVersion ver) { |
| return SUPPORTED_VERS.contains(ver); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerProtocolVersion defaultVersion() { |
| return DEFAULT_VER; |
| } |
| |
| /** |
| * @return Currently used protocol context. |
| */ |
| public ClientProtocolContext currentProtocolContext() { |
| return currentProtocolContext; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void initializeFromHandshake(GridNioSession ses, |
| ClientListenerProtocolVersion ver, BinaryReaderExImpl reader) |
| throws IgniteCheckedException { |
| |
| EnumSet<ClientBitmaskFeature> features = null; |
| |
| if (ClientProtocolContext.isFeatureSupported(ver, BITMAP_FEATURES)) { |
| byte[] cliFeatures = reader.readByteArray(); |
| |
| features = ClientBitmaskFeature.enumSet(cliFeatures); |
| } |
| |
| currentProtocolContext = new ClientProtocolContext(ver, features); |
| |
| String user = null; |
| String pwd = null; |
| |
| if (currentProtocolContext.isFeatureSupported(USER_ATTRIBUTES)) |
| userAttrs = reader.readMap(); |
| |
| if (currentProtocolContext.isFeatureSupported(AUTHORIZATION)) { |
| boolean hasMore; |
| try { |
| hasMore = reader.available() > 0; |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Handshake error: " + e.getMessage(), e); |
| } |
| |
| if (hasMore) { |
| user = reader.readString(); |
| pwd = reader.readString(); |
| } |
| } |
| |
| authenticate(ses, user, pwd); |
| |
| initClientDescriptor("cli"); |
| |
| handler = new ClientRequestHandler(this, currentProtocolContext); |
| parser = new ClientMessageParser(this, currentProtocolContext); |
| |
| this.ses = ses; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerRequestHandler handler() { |
| return handler; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ClientListenerMessageParser parser() { |
| return parser; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDisconnected() { |
| resReg.clean(); |
| |
| cleanupTxs(); |
| |
| super.onDisconnected(); |
| } |
| |
| /** |
| * Increments the cursor count. |
| */ |
| public void incrementCursors() { |
| long curCnt0 = curCnt.get(); |
| |
| if (curCnt0 >= maxCursors) { |
| throw new IgniteClientException(ClientStatus.TOO_MANY_CURSORS, |
| "Too many open cursors (either close other open cursors or increase the " + |
| "limit through ClientConnectorConfiguration.maxOpenCursorsPerConnection) [maximum=" + maxCursors + |
| ", current=" + curCnt0 + ']'); |
| } |
| |
| curCnt.incrementAndGet(); |
| } |
| |
| /** |
| * Increments the cursor count. |
| */ |
| public void decrementCursors() { |
| curCnt.decrementAndGet(); |
| } |
| |
| /** |
| * Atomically check whether affinity topology version has changed since the last call and sets new version as a last. |
| * @return New version, if it has changed since the last call. |
| */ |
| public ClientAffinityTopologyVersion checkAffinityTopologyVersion() { |
| while (true) { |
| AffinityTopologyVersion oldVer = lastAffinityTopologyVersion.get(); |
| AffinityTopologyVersion newVer = ctx.cache().context().exchange().readyAffinityVersion(); |
| |
| boolean changed = oldVer == null || oldVer.compareTo(newVer) < 0; |
| |
| if (changed) { |
| boolean success = lastAffinityTopologyVersion.compareAndSet(oldVer, newVer); |
| |
| if (!success) |
| continue; |
| } |
| |
| return new ClientAffinityTopologyVersion(newVer, changed); |
| } |
| } |
| |
| /** |
| * Next transaction id for this connection. |
| */ |
| public int nextTxId() { |
| int txId = txIdSeq.incrementAndGet(); |
| |
| return txId == 0 ? txIdSeq.incrementAndGet() : txId; |
| } |
| |
| /** |
| * Transaction context by transaction id. |
| * |
| * @param txId Tx ID. |
| */ |
| public ClientTxContext txContext(int txId) { |
| return txs.get(txId); |
| } |
| |
| /** |
| * Add new transaction context to connection. |
| * |
| * @param txCtx Tx context. |
| */ |
| public void addTxContext(ClientTxContext txCtx) { |
| if (txsCnt.incrementAndGet() > maxActiveTxCnt) { |
| txsCnt.decrementAndGet(); |
| |
| throw new IgniteClientException(ClientStatus.TX_LIMIT_EXCEEDED, "Active transactions per connection limit " + |
| "(" + maxActiveTxCnt + ") exceeded. To start a new transaction you need to wait for some of currently " + |
| "active transactions complete. To change the limit set up " + |
| "ThinClientConfiguration.MaxActiveTxPerConnection property."); |
| } |
| |
| txs.put(txCtx.txId(), txCtx); |
| } |
| |
| /** |
| * Remove transaction context from connection. |
| * |
| * @param txId Tx ID. |
| */ |
| public void removeTxContext(int txId) { |
| txs.remove(txId); |
| |
| txsCnt.decrementAndGet(); |
| } |
| |
| /** |
| * |
| */ |
| private void cleanupTxs() { |
| for (ClientTxContext txCtx : txs.values()) |
| txCtx.close(); |
| |
| txs.clear(); |
| } |
| |
| /** |
| * Send notification to the client. |
| * |
| * @param notification Notification. |
| */ |
| public void notifyClient(ClientNotification notification) { |
| ses.send(parser.encode(notification)); |
| } |
| |
| /** |
| * Increments the active compute tasks count. |
| */ |
| public void incrementActiveTasksCount() { |
| if (maxActiveComputeTasks == 0) { |
| throw new IgniteClientException(ClientStatus.FUNCTIONALITY_DISABLED, |
| "Compute grid functionality is disabled for thin clients on server node. " + |
| "To enable it set up the ThinClientConfiguration.MaxActiveComputeTasksPerConnection property."); |
| } |
| |
| if (activeTasksCnt.incrementAndGet() > maxActiveComputeTasks) { |
| activeTasksCnt.decrementAndGet(); |
| |
| throw new IgniteClientException(ClientStatus.TOO_MANY_COMPUTE_TASKS, "Active compute tasks per connection " + |
| "limit (" + maxActiveComputeTasks + ") exceeded. To start a new task you need to wait for some of " + |
| "currently active tasks complete. To change the limit set up the " + |
| "ThinClientConfiguration.MaxActiveComputeTasksPerConnection property."); |
| } |
| } |
| |
| /** |
| * Decrements the active compute tasks count. |
| */ |
| public void decrementActiveTasksCount() { |
| int cnt = activeTasksCnt.decrementAndGet(); |
| |
| assert cnt >= 0 : "Unexpected active tasks count: " + cnt; |
| } |
| } |