blob: 66b9e89e02768b76b16c20e254d4e2dd09263e5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.platform.client;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
import org.apache.ignite.internal.processors.odbc.ClientMessage;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNamePutRequest;
import org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypePutRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheClearKeyRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheClearKeysRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheClearRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheContainsKeyRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheContainsKeysRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheCreateWithConfigurationRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheCreateWithNameRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheDestroyRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetAndPutIfAbsentRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetAndPutRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetAndRemoveRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetAndReplaceRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetConfigurationRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetNamesRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetOrCreateWithConfigurationRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetOrCreateWithNameRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGetSizeRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutIfAbsentRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryContinuousRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryNextPageRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveIfEqualsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveKeyRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveKeysRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheReplaceIfEqualsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheReplaceRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheScanQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlFieldsQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheSqlQueryRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterChangeStateRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGetStateRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodeIdsRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodesDetailsRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterGroupGetNodesEndpointsRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterWalChangeStateRequest;
import org.apache.ignite.internal.processors.platform.client.cluster.ClientClusterWalGetStateRequest;
import org.apache.ignite.internal.processors.platform.client.compute.ClientExecuteTaskRequest;
import org.apache.ignite.internal.processors.platform.client.service.ClientServiceInvokeRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxEndRequest;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxStartRequest;
/**
* Thin client message parser.
*/
public class ClientMessageParser implements ClientListenerMessageParser {
/* General-purpose operations. */
/** */
private static final short OP_RESOURCE_CLOSE = 0;
/* Cache operations */
/** */
private static final short OP_CACHE_GET = 1000;
/** */
private static final short OP_CACHE_PUT = 1001;
/** */
private static final short OP_CACHE_PUT_IF_ABSENT = 1002;
/** */
private static final short OP_CACHE_GET_ALL = 1003;
/** */
private static final short OP_CACHE_PUT_ALL = 1004;
/** */
private static final short OP_CACHE_GET_AND_PUT = 1005;
/** */
private static final short OP_CACHE_GET_AND_REPLACE = 1006;
/** */
private static final short OP_CACHE_GET_AND_REMOVE = 1007;
/** */
private static final short OP_CACHE_GET_AND_PUT_IF_ABSENT = 1008;
/** */
private static final short OP_CACHE_REPLACE = 1009;
/** */
private static final short OP_CACHE_REPLACE_IF_EQUALS = 1010;
/** */
private static final short OP_CACHE_CONTAINS_KEY = 1011;
/** */
private static final short OP_CACHE_CONTAINS_KEYS = 1012;
/** */
private static final short OP_CACHE_CLEAR = 1013;
/** */
private static final short OP_CACHE_CLEAR_KEY = 1014;
/** */
private static final short OP_CACHE_CLEAR_KEYS = 1015;
/** */
private static final short OP_CACHE_REMOVE_KEY = 1016;
/** */
private static final short OP_CACHE_REMOVE_IF_EQUALS = 1017;
/** */
private static final short OP_CACHE_REMOVE_KEYS = 1018;
/** */
private static final short OP_CACHE_REMOVE_ALL = 1019;
/** */
private static final short OP_CACHE_GET_SIZE = 1020;
/** */
private static final short OP_CACHE_LOCAL_PEEK = 1021;
/* Cache create / destroy, configuration. */
/** */
private static final short OP_CACHE_GET_NAMES = 1050;
/** */
private static final short OP_CACHE_CREATE_WITH_NAME = 1051;
/** */
private static final short OP_CACHE_GET_OR_CREATE_WITH_NAME = 1052;
/** */
private static final short OP_CACHE_CREATE_WITH_CONFIGURATION = 1053;
/** */
private static final short OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION = 1054;
/** */
private static final short OP_CACHE_GET_CONFIGURATION = 1055;
/** */
private static final short OP_CACHE_DESTROY = 1056;
/* Cache service info. */
/** Deprecated since 1.3.0. Replaced by OP_CACHE_PARTITIONS. */
private static final short OP_CACHE_NODE_PARTITIONS = 1100;
/** */
private static final short OP_CACHE_PARTITIONS = 1101;
/* Query operations. */
/** */
private static final short OP_QUERY_SCAN = 2000;
/** */
private static final short OP_QUERY_SCAN_CURSOR_GET_PAGE = 2001;
/** */
private static final short OP_QUERY_SQL = 2002;
/** */
private static final short OP_QUERY_SQL_CURSOR_GET_PAGE = 2003;
/** */
private static final short OP_QUERY_SQL_FIELDS = 2004;
/** */
private static final short OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE = 2005;
/** */
private static final short OP_QUERY_CONTINUOUS = 2006;
/** */
public static final short OP_QUERY_CONTINUOUS_EVENT_NOTIFICATION = 2007;
/* Binary metadata operations. */
/** */
private static final short OP_BINARY_TYPE_NAME_GET = 3000;
/** */
private static final short OP_BINARY_TYPE_NAME_PUT = 3001;
/** */
private static final short OP_BINARY_TYPE_GET = 3002;
/** */
private static final short OP_BINARY_TYPE_PUT = 3003;
/** Start new transaction. */
private static final short OP_TX_START = 4000;
/** Commit transaction. */
private static final short OP_TX_END = 4001;
/* Cluster operations. */
/** */
private static final short OP_CLUSTER_GET_STATE = 5000;
/** */
private static final short OP_CLUSTER_CHANGE_STATE = 5001;
/** */
private static final short OP_CLUSTER_CHANGE_WAL_STATE = 5002;
/** */
private static final short OP_CLUSTER_GET_WAL_STATE = 5003;
/** */
private static final short OP_CLUSTER_GROUP_GET_NODE_IDS = 5100;
/** */
private static final short OP_CLUSTER_GROUP_GET_NODE_INFO = 5101;
/** */
private static final short OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS = 5102;
/* Compute operations. */
/** */
private static final short OP_COMPUTE_TASK_EXECUTE = 6000;
/** */
public static final short OP_COMPUTE_TASK_FINISHED = 6001;
/** Service invocation. */
private static final short OP_SERVICE_INVOKE = 7000;
/** Marshaller. */
private final GridBinaryMarshaller marsh;
/** Client connection context */
private final ClientConnectionContext ctx;
/** Client protocol context */
private final ClientProtocolContext protocolCtx;
/**
* @param ctx Client connection context.
*/
ClientMessageParser(ClientConnectionContext ctx, ClientProtocolContext protocolCtx) {
assert ctx != null;
assert protocolCtx != null;
this.ctx = ctx;
this.protocolCtx = protocolCtx;
CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
marsh = cacheObjProc.marshaller();
}
/** {@inheritDoc} */
@Override public ClientListenerRequest decode(ClientMessage msg) {
assert msg != null;
BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
// skipHdrCheck must be true (we have 103 op code).
BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), inStream,
null, null, true, true);
return decode(reader);
}
/**
* Decodes the request.
*
* @param reader Reader.
* @return Request.
*/
public ClientListenerRequest decode(BinaryReaderExImpl reader) {
short opCode = reader.readShort();
switch (opCode) {
case OP_CACHE_GET:
return new ClientCacheGetRequest(reader);
case OP_BINARY_TYPE_NAME_GET:
return new ClientBinaryTypeNameGetRequest(reader);
case OP_BINARY_TYPE_GET:
return new ClientBinaryTypeGetRequest(reader);
case OP_CACHE_PUT:
return new ClientCachePutRequest(reader);
case OP_BINARY_TYPE_NAME_PUT:
return new ClientBinaryTypeNamePutRequest(reader);
case OP_BINARY_TYPE_PUT:
return new ClientBinaryTypePutRequest(reader);
case OP_QUERY_SCAN:
return new ClientCacheScanQueryRequest(reader);
case OP_QUERY_SCAN_CURSOR_GET_PAGE:
case OP_QUERY_SQL_CURSOR_GET_PAGE:
return new ClientCacheQueryNextPageRequest(reader);
case OP_RESOURCE_CLOSE:
return new ClientResourceCloseRequest(reader);
case OP_CACHE_CONTAINS_KEY:
return new ClientCacheContainsKeyRequest(reader);
case OP_CACHE_CONTAINS_KEYS:
return new ClientCacheContainsKeysRequest(reader);
case OP_CACHE_GET_ALL:
return new ClientCacheGetAllRequest(reader);
case OP_CACHE_GET_AND_PUT:
return new ClientCacheGetAndPutRequest(reader);
case OP_CACHE_GET_AND_REPLACE:
return new ClientCacheGetAndReplaceRequest(reader);
case OP_CACHE_GET_AND_REMOVE:
return new ClientCacheGetAndRemoveRequest(reader);
case OP_CACHE_PUT_IF_ABSENT:
return new ClientCachePutIfAbsentRequest(reader);
case OP_CACHE_GET_AND_PUT_IF_ABSENT:
return new ClientCacheGetAndPutIfAbsentRequest(reader);
case OP_CACHE_REPLACE:
return new ClientCacheReplaceRequest(reader);
case OP_CACHE_REPLACE_IF_EQUALS:
return new ClientCacheReplaceIfEqualsRequest(reader);
case OP_CACHE_PUT_ALL:
return new ClientCachePutAllRequest(reader);
case OP_CACHE_CLEAR:
return new ClientCacheClearRequest(reader);
case OP_CACHE_CLEAR_KEY:
return new ClientCacheClearKeyRequest(reader);
case OP_CACHE_CLEAR_KEYS:
return new ClientCacheClearKeysRequest(reader);
case OP_CACHE_REMOVE_KEY:
return new ClientCacheRemoveKeyRequest(reader);
case OP_CACHE_REMOVE_IF_EQUALS:
return new ClientCacheRemoveIfEqualsRequest(reader);
case OP_CACHE_GET_SIZE:
return new ClientCacheGetSizeRequest(reader);
case OP_CACHE_REMOVE_KEYS:
return new ClientCacheRemoveKeysRequest(reader);
case OP_CACHE_LOCAL_PEEK:
return new ClientCacheLocalPeekRequest(reader);
case OP_CACHE_REMOVE_ALL:
return new ClientCacheRemoveAllRequest(reader);
case OP_CACHE_CREATE_WITH_NAME:
return new ClientCacheCreateWithNameRequest(reader);
case OP_CACHE_GET_OR_CREATE_WITH_NAME:
return new ClientCacheGetOrCreateWithNameRequest(reader);
case OP_CACHE_DESTROY:
return new ClientCacheDestroyRequest(reader);
case OP_CACHE_NODE_PARTITIONS:
return new ClientCacheNodePartitionsRequest(reader);
case OP_CACHE_PARTITIONS:
return new ClientCachePartitionsRequest(reader);
case OP_CACHE_GET_NAMES:
return new ClientCacheGetNamesRequest(reader);
case OP_CACHE_GET_CONFIGURATION:
return new ClientCacheGetConfigurationRequest(reader, protocolCtx);
case OP_CACHE_CREATE_WITH_CONFIGURATION:
return new ClientCacheCreateWithConfigurationRequest(reader, protocolCtx);
case OP_CACHE_GET_OR_CREATE_WITH_CONFIGURATION:
return new ClientCacheGetOrCreateWithConfigurationRequest(reader, protocolCtx);
case OP_QUERY_SQL:
return new ClientCacheSqlQueryRequest(reader);
case OP_QUERY_SQL_FIELDS:
return new ClientCacheSqlFieldsQueryRequest(reader, protocolCtx);
case OP_QUERY_SQL_FIELDS_CURSOR_GET_PAGE:
return new ClientCacheQueryNextPageRequest(reader);
case OP_QUERY_CONTINUOUS:
return new ClientCacheQueryContinuousRequest(reader);
case OP_TX_START:
return new ClientTxStartRequest(reader);
case OP_TX_END:
return new ClientTxEndRequest(reader);
case OP_CLUSTER_GET_STATE:
return new ClientClusterGetStateRequest(reader);
case OP_CLUSTER_CHANGE_STATE:
return new ClientClusterChangeStateRequest(reader);
case OP_CLUSTER_CHANGE_WAL_STATE:
return new ClientClusterWalChangeStateRequest(reader);
case OP_CLUSTER_GET_WAL_STATE:
return new ClientClusterWalGetStateRequest(reader);
case OP_CLUSTER_GROUP_GET_NODE_IDS:
return new ClientClusterGroupGetNodeIdsRequest(reader);
case OP_CLUSTER_GROUP_GET_NODE_INFO:
return new ClientClusterGroupGetNodesDetailsRequest(reader);
case OP_CLUSTER_GROUP_GET_NODE_ENDPOINTS:
return new ClientClusterGroupGetNodesEndpointsRequest(reader);
case OP_COMPUTE_TASK_EXECUTE:
return new ClientExecuteTaskRequest(reader);
case OP_SERVICE_INVOKE:
return new ClientServiceInvokeRequest(reader);
}
return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
"Invalid request op code: " + opCode);
}
/** {@inheritDoc} */
@Override public ClientMessage encode(ClientListenerResponse resp) {
assert resp != null;
BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32, BinaryMemoryAllocator.POOLED.chunk());
BinaryRawWriterEx writer = marsh.writer(outStream);
assert resp instanceof ClientOutgoingMessage : "Unexpected response type: " + resp.getClass();
((ClientOutgoingMessage)resp).encode(ctx, writer);
return new ClientMessage(outStream);
}
/** {@inheritDoc} */
@Override public int decodeCommandType(ClientMessage msg) {
assert msg != null;
BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
return inStream.readShort();
}
/** {@inheritDoc} */
@Override public long decodeRequestId(ClientMessage msg) {
return 0;
}
}