blob: 3f6ef3c78bbb7e61f7c3fbd32ec75aa22e6433a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.rest.protocols.tcp;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestProtocolHandler;
import org.apache.ignite.internal.processors.rest.GridRestResponse;
import org.apache.ignite.internal.processors.rest.handlers.cache.GridCacheRestMetrics;
import org.apache.ignite.internal.processors.rest.request.DataStructuresRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.lang.IgniteClosure2X;
import org.apache.ignite.internal.util.nio.GridNioFuture;
import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_DECREMENT;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.ATOMIC_INCREMENT;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_ADD;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_APPEND;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_METRICS;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PREPEND;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PUT;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REMOVE;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REMOVE_ALL;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_REPLACE;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.NOOP;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.QUIT;
import static org.apache.ignite.internal.processors.rest.GridRestCommand.VERSION;
import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.FAILURE;
import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.KEY_NOT_FOUND;
import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.MEMCACHE_REQ_FLAG;
import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.SUCCESS;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.LAST_FUT;
/**
* Handles memcache requests.
*/
public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<GridMemcachedMessage> {
/** Used cache name in case the name was not defined in a request. */
private static final String CACHE_NAME = "default";
/** Logger */
private final IgniteLogger log;
/** Handler. */
private final GridRestProtocolHandler hnd;
/**
* Creates listener which will convert incoming tcp packets to rest requests and forward them to
* a given rest handler.
*
* @param log Logger to use.
* @param hnd Rest handler.
*/
public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd) {
this.log = log;
this.hnd = hnd;
}
/** {@inheritDoc} */
@Override public void onConnected(GridNioSession ses) {
// No-op, never called.
assert false;
}
/** {@inheritDoc} */
@Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
// No-op, never called.
assert false;
}
/** {@inheritDoc} */
@SuppressWarnings({"IfMayBeConditional"})
@Override public void onMessage(final GridNioSession ses, final GridMemcachedMessage req) {
assert req != null;
final GridTuple3<GridRestCommand, Boolean, Boolean> cmd = command(req.operationCode());
if (cmd == null) {
U.warn(log, "Cannot find corresponding REST command for op code (session will be closed) [ses=" + ses +
", opCode=" + Integer.toHexString(req.operationCode()) + ']');
ses.close();
return;
}
assert req.requestFlag() == MEMCACHE_REQ_FLAG;
assert cmd.get2() != null && cmd.get3() != null;
// Close connection on 'Quit' command.
if (cmd.get1() == QUIT) {
try {
if (cmd.get2()) {
GridMemcachedMessage res = new GridMemcachedMessage(req);
sendResponse(ses, res).get();
}
}
// Catch all when quitting.
catch (Exception e) {
U.warn(log, "Failed to send quit response packet (session will be closed anyway) [ses=" + ses +
", msg=" + e.getMessage() + "]");
}
finally {
ses.close();
}
return;
}
IgniteInternalFuture<GridRestResponse> lastFut = ses.removeMeta(LAST_FUT.ordinal());
if (lastFut != null && lastFut.isDone())
lastFut = null;
IgniteInternalFuture<GridRestResponse> f;
if (lastFut == null)
f = handleRequest0(ses, req, cmd);
else {
f = new GridEmbeddedFuture<>(
lastFut,
new C2<GridRestResponse, Exception, IgniteInternalFuture<GridRestResponse>>() {
@Override public IgniteInternalFuture<GridRestResponse> apply(GridRestResponse res, Exception e) {
return handleRequest0(ses, req, cmd);
}
}
);
}
if (f != null)
ses.addMeta(LAST_FUT.ordinal(), f);
}
/**
* @param ses Session.
* @param req Request.
* @param cmd Command.
* @return Future or {@code null} if processed immediately.
*/
@Nullable private IgniteInternalFuture<GridRestResponse> handleRequest0(
final GridNioSession ses,
final GridMemcachedMessage req,
final GridTuple3<GridRestCommand, Boolean, Boolean> cmd
) {
if (cmd.get1() == NOOP) {
GridMemcachedMessage res0 = new GridMemcachedMessage(req);
res0.status(SUCCESS);
sendResponse(ses, res0);
return null;
}
return new GridEmbeddedFuture<>(new IgniteClosure2X<GridRestResponse, Exception, GridRestResponse>() {
@Override public GridRestResponse applyx(GridRestResponse restRes,
Exception ex) throws IgniteCheckedException {
if(ex != null)
throw U.cast(ex);
// Handle 'Stat' command (special case because several packets are included in response).
if (cmd.get1() == CACHE_METRICS) {
assert restRes.getResponse() instanceof GridCacheRestMetrics;
Map<String, Long> metrics = ((GridCacheRestMetrics)restRes.getResponse()).map();
for (Map.Entry<String, Long> e : metrics.entrySet()) {
GridMemcachedMessage res = new GridMemcachedMessage(req);
res.key(e.getKey());
res.value(String.valueOf(e.getValue()));
sendResponse(ses, res);
}
sendResponse(ses, new GridMemcachedMessage(req));
}
else {
GridMemcachedMessage res = new GridMemcachedMessage(req);
if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS) {
switch (cmd.get1()) {
case CACHE_GET: {
res.status(restRes.getResponse() == null ? KEY_NOT_FOUND : SUCCESS);
break;
}
case CACHE_PUT:
case CACHE_ADD:
case CACHE_REMOVE:
case CACHE_REPLACE:
case CACHE_CAS:
case CACHE_APPEND:
case CACHE_PREPEND: {
boolean res0 = restRes.getResponse().equals(Boolean.TRUE);
res.status(res0 ? SUCCESS : FAILURE);
break;
}
default: {
res.status(SUCCESS);
break;
}
}
}
else
res.status(FAILURE);
if (cmd.get3() == Boolean.TRUE)
res.key(req.key());
if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS && res.addData() &&
restRes.getResponse() != null)
res.value(restRes.getResponse());
sendResponse(ses, res);
}
return restRes;
}
}, hnd.handleAsync(createRestRequest(req, cmd.get1())));
}
/**
* @param ses NIO session.
* @param res Response.
* @return NIO send future.
*/
private GridNioFuture<?> sendResponse(GridNioSession ses, GridMemcachedMessage res) {
return ses.send(res);
}
/**
* Creates REST request from the protocol request.
*
* @param req Request.
* @param cmd Command.
* @return REST request.
*/
private GridRestRequest createRestRequest(GridMemcachedMessage req, GridRestCommand cmd) {
assert req != null;
if (cmd == ATOMIC_INCREMENT || cmd == ATOMIC_DECREMENT) {
DataStructuresRequest restReq = new DataStructuresRequest();
restReq.command(cmd);
restReq.key(req.key());
restReq.delta(req.delta());
restReq.initial(req.initial());
return restReq;
}
else {
GridRestCacheRequest restReq = new GridRestCacheRequest();
restReq.command(cmd);
restReq.clientId(req.clientId());
restReq.ttl(req.expiration());
restReq.cacheName(req.cacheName() == null ? CACHE_NAME : req.cacheName());
restReq.key(req.key());
if (cmd == CACHE_REMOVE_ALL) {
Object[] keys = (Object[]) req.value();
if (keys != null) {
Map<Object, Object> map = new HashMap<>();
for (Object key : keys) {
map.put(key, null);
}
restReq.values(map);
}
}
else {
if (req.value() != null)
restReq.value(req.value());
}
return restReq;
}
}
/**
* Gets command and command attributes from operation code.
*
* @param opCode Operation code.
* @return Command.
*/
@Nullable private GridTuple3<GridRestCommand, Boolean, Boolean> command(int opCode) {
GridRestCommand cmd;
boolean quiet = false;
boolean retKey = false;
switch (opCode) {
case 0x00:
cmd = CACHE_GET;
break;
case 0x01:
cmd = CACHE_PUT;
break;
case 0x02:
cmd = CACHE_ADD;
break;
case 0x03:
cmd = CACHE_REPLACE;
break;
case 0x04:
cmd = CACHE_REMOVE;
break;
case 0x05:
cmd = ATOMIC_INCREMENT;
break;
case 0x06:
cmd = ATOMIC_DECREMENT;
break;
case 0x07:
cmd = QUIT;
break;
case 0x08:
cmd = CACHE_REMOVE_ALL;
break;
case 0x09:
cmd = CACHE_GET;
break;
case 0x0A:
cmd = NOOP;
break;
case 0x0B:
cmd = VERSION;
break;
case 0x0C:
cmd = CACHE_GET;
retKey = true;
break;
case 0x0D:
cmd = CACHE_GET;
retKey = true;
break;
case 0x0E:
cmd = CACHE_APPEND;
break;
case 0x0F:
cmd = CACHE_PREPEND;
break;
case 0x10:
cmd = CACHE_METRICS;
break;
case 0x11:
cmd = CACHE_PUT;
quiet = true;
break;
case 0x12:
cmd = CACHE_ADD;
quiet = true;
break;
case 0x13:
cmd = CACHE_REPLACE;
quiet = true;
break;
case 0x14:
cmd = CACHE_REMOVE;
quiet = true;
break;
case 0x15:
cmd = ATOMIC_INCREMENT;
quiet = true;
break;
case 0x16:
cmd = ATOMIC_DECREMENT;
quiet = true;
break;
case 0x17:
cmd = QUIT;
quiet = true;
break;
case 0x18:
cmd = CACHE_REMOVE_ALL;
quiet = true;
break;
case 0x19:
cmd = CACHE_APPEND;
quiet = true;
break;
case 0x1A:
cmd = CACHE_PREPEND;
quiet = true;
break;
default:
return null;
}
return new GridTuple3<>(cmd, quiet, retKey);
}
}