blob: 848d4bb8319af08996af77714f71ba072d859bb1 [file] [log] [blame]
package com.gemstone.gemfire.internal.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.gemstone.gemfire.LogWriter;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheTransactionManager;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.TransactionException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.internal.redis.executor.transactions.TransactionExecutor;
import com.gemstone.gemfire.redis.GemFireRedisServer;
public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private static final int MAXIMUM_NUM_RETRIES = 5;
private final Cache cache;
private final GemFireRedisServer server;
private final LogWriter logger;
private final Channel channel;
private final AtomicBoolean needChannelFlush;
private final Runnable flusher;
private final EventExecutor lastExecutor;
private final ByteBufAllocator byteBufAllocator;
/**
* TransactionId for any transactions started by this client
*/
private TransactionId transactionID;
/**
* Queue of commands for a given transaction
*/
private Queue<Command> transactionQueue;
private final RegionCache regionCache;
public ExecutionHandlerContext(Channel ch, Cache cache, RegionCache regions, GemFireRedisServer server) {
this.cache = cache;
this.server = server;
this.logger = cache.getLogger();
this.channel = ch;
this.needChannelFlush = new AtomicBoolean(false);
this.flusher = new Runnable() {
@Override
public void run() {
flushChannel();
}
};
this.lastExecutor = channel.pipeline().lastContext().executor();
this.byteBufAllocator = channel.alloc();
this.transactionID = null;
this.transactionQueue = null; // Lazy
this.regionCache = regions;
}
private void flushChannel() {
while (needChannelFlush.getAndSet(false)) {
channel.flush();
}
}
private void writeToChannel(Object message) {
channel.write(message, channel.voidPromise());
if (!needChannelFlush.getAndSet(true)) {
this.lastExecutor.execute(flusher);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Command command = (Command) msg;
executeCommand(ctx, command);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof IOException) {
channelInactive(ctx);
return;
}
ByteBuf response = getExceptionResponse(ctx, cause);
writeToChannel(response);
}
private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
ByteBuf response;
if (cause instanceof RedisDataTypeMismatchException)
response = Coder.getWrongTypeResponse(this.byteBufAllocator, cause.getMessage());
else if (cause instanceof DecoderException && cause.getCause() instanceof RedisCommandParserException)
response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.PARSING_EXCEPTION_MESSAGE);
else if (cause instanceof RegionCreationException)
response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.ERROR_REGION_CREATION);
else if (cause instanceof InterruptedException)
response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_SHUTDOWN);
else if (cause instanceof IllegalStateException) {
response = Coder.getErrorResponse(this.byteBufAllocator, cause.getMessage());
} else {
if (this.logger.errorEnabled())
this.logger.error("GemFireRedisServer-Unexpected error handler", cause);
response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_MESSAGE);
}
return response;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (logger.fineEnabled())
logger.fine("GemFireRedisServer-Connection closing with " + ctx.channel().remoteAddress());
ctx.channel().close();
ctx.close();
}
/**
* This method is used to execute the command. The executor is
* determined by the {@link RedisCommandType} and then the execution
* is started.
*
* @param command Command to be executed
* @param cache The Cache instance of this server
* @param client The client data associated with the client
* @throws Exception
*/
public void executeCommand(ChannelHandlerContext ctx, Command command) throws Exception {
RedisCommandType type = command.getCommandType();
if (type == RedisCommandType.SHUTDOWN) {
this.server.shutdown();
return;
}
Executor exec = type.getExecutor();
if (hasTransaction() && !(exec instanceof TransactionExecutor))
executeWithTransaction(ctx, exec, command);
else
executeWithoutTransaction(exec, command, MAXIMUM_NUM_RETRIES);
if (hasTransaction() && command.getCommandType() != RedisCommandType.MULTI)
writeToChannel(Coder.getSimpleStringResponse(this.byteBufAllocator, RedisConstants.COMMAND_QUEUED));
else
writeToChannel(command.getResponse());
if (type == RedisCommandType.QUIT) {
channelInactive(ctx);
}
}
/**
* Private helper method to execute a command without a transaction, done for
* special exception handling neatness
*
* @param exec Executor to use
* @param command Command to execute
* @param cache Cache instance
* @param client Client data associated with client
* @param n Recursive max depth of retries
* @throws Exception Throws exception if exception is from within execution and not to be handled
*/
private void executeWithoutTransaction(final Executor exec, Command command, int n) throws Exception {
try {
exec.executeCommand(command, this);
} catch (RegionDestroyedException e) {
if (n > 0)
executeWithoutTransaction(exec, command, n - 1);
else
throw e;
}
}
/**
* Private method to execute a command when a transaction has been started
*
* @param exec Executor to use
* @param command Command to execute
* @param cache Cache instance
* @param client Client data associated with client
* @throws Exception Throws exception if exception is from within execution and unrelated to transactions
*/
private void executeWithTransaction(ChannelHandlerContext ctx, final Executor exec, Command command) throws Exception {
CacheTransactionManager txm = cache.getCacheTransactionManager();
TransactionId transactionId = getTransactionID();
txm.resume(transactionId);
try {
exec.executeCommand(command, this);
} catch(UnsupportedOperationInTransactionException e) {
command.setResponse(Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION));
} catch (TransactionException e) {
command.setResponse(Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.ERROR_TRANSACTION_EXCEPTION));
} catch (Exception e) {
ByteBuf response = getExceptionResponse(ctx, e);
command.setResponse(response);
}
getTransactionQueue().add(command);
transactionId = txm.suspend();
setTransactionID(transactionId);
}
/**
* Get the current transacationId
*
* @return The current transactionId, null if one doesn't exist
*/
public TransactionId getTransactionID() {
return this.transactionID;
}
/**
* Check if client has transaction
*
* @return True if client has transaction, false otherwise
*/
public boolean hasTransaction() {
return transactionID != null;
}
/**
* Setter method for transaction
*
* @param id TransactionId of current transaction for client
*/
public void setTransactionID(TransactionId id) {
this.transactionID = id;
}
/**
* Reset the transaction of client
*/
public void clearTransaction() {
this.transactionID = null;
if (this.transactionQueue != null) {
for (Command c : this.transactionQueue) {
ByteBuf r = c.getResponse();
if (r != null)
r.release();
}
this.transactionQueue.clear();
}
}
/**
* Getter for transaction command queue
*
* @return Command queue
*/
public Queue<Command> getTransactionQueue() {
if (this.transactionQueue == null)
this.transactionQueue = new ConcurrentLinkedQueue<Command>();
return this.transactionQueue;
}
public ByteBufAllocator getByteBufAllocator() {
return this.byteBufAllocator;
}
public RegionCache getRegionCache() {
return this.regionCache;
}
public CacheTransactionManager getCacheTransactionManager() {
return this.cache.getCacheTransactionManager();
}
}