blob: 628b29498984524aeb3991958f9b6d7649533a4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.redis.internal;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderException;
import io.netty.util.concurrent.EventExecutor;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.TransactionId;
import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.cache.query.QueryInvocationTargetException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.redis.GeodeRedisServer;
import org.apache.geode.redis.internal.executor.transactions.TransactionExecutor;
/**
* This class extends {@link ChannelInboundHandlerAdapter} from Netty and it is the last part of the
* channel pipeline. The {@link ByteToCommandDecoder} forwards a {@link Command} to this class which
* executes it and sends the result back to the client. Additionally, all exception handling is done
* by this class.
* <p>
* Besides being part of Netty's pipeline, this class also serves as a context to the execution of a
* command. It abstracts transactions, provides access to the {@link RegionProvider} and anything
* else an executing {@link Command} may need.
*
*
*/
public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private static final int WAIT_REGION_DSTRYD_MILLIS = 100;
private static final int MAXIMUM_NUM_RETRIES = (1000 * 60) / WAIT_REGION_DSTRYD_MILLIS; // 60
// seconds
// total
private final Cache cache;
private final GeodeRedisServer server;
private final LogWriter logger;
private final Channel channel;
private final AtomicBoolean needChannelFlush;
private final Runnable flusher;
private final EventExecutor lastExecutor;
private final ByteBufAllocator byteBufAllocator;
/**
* TransactionId for any transactions started by this client
*/
private TransactionId transactionID;
/**
* Queue of commands for a given transaction
*/
private Queue<Command> transactionQueue;
private final RegionProvider regionProvider;
private final byte[] authPwd;
private boolean isAuthenticated;
/**
* Default constructor for execution contexts.
*
* @param ch Channel used by this context, should be one to one
* @param cache The Geode cache instance of this vm
* @param regionProvider The region provider of this context
* @param server Instance of the server it is attached to, only used so that any execution can
* initiate a shutdwon
* @param pwd Authentication password for each context, can be null
*/
public ExecutionHandlerContext(Channel ch, Cache cache, RegionProvider regionProvider,
GeodeRedisServer server, byte[] pwd) {
if (ch == null || cache == null || regionProvider == null || server == null)
throw new IllegalArgumentException("Only the authentication password may be null");
this.cache = cache;
this.server = server;
this.logger = cache.getLogger();
this.channel = ch;
this.needChannelFlush = new AtomicBoolean(false);
this.flusher = new Runnable() {
@Override
public void run() {
flushChannel();
}
};
this.lastExecutor = channel.pipeline().lastContext().executor();
this.byteBufAllocator = channel.alloc();
this.transactionID = null;
this.transactionQueue = null; // Lazy
this.regionProvider = regionProvider;
this.authPwd = pwd;
this.isAuthenticated = pwd != null ? false : true;
}
private void flushChannel() {
while (needChannelFlush.getAndSet(false)) {
channel.flush();
}
}
private void writeToChannel(ByteBuf message) {
channel.write(message, channel.voidPromise());
if (!needChannelFlush.getAndSet(true)) {
this.lastExecutor.execute(flusher);
}
}
/**
* This will handle the execution of received commands
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Command command = (Command) msg;
executeCommand(ctx, command);
}
/**
* Exception handler for the entire pipeline
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof IOException) {
channelInactive(ctx);
return;
}
ByteBuf response = getExceptionResponse(ctx, cause);
writeToChannel(response);
}
private ByteBuf getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
ByteBuf response;
if (cause instanceof RedisDataTypeMismatchException)
response = Coder.getWrongTypeResponse(this.byteBufAllocator, cause.getMessage());
else if (cause instanceof DecoderException
&& cause.getCause() instanceof RedisCommandParserException)
response =
Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.PARSING_EXCEPTION_MESSAGE);
else if (cause instanceof RegionCreationException) {
this.logger.error(cause);
response =
Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.ERROR_REGION_CREATION);
} else if (cause instanceof InterruptedException || cause instanceof CacheClosedException)
response =
Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_SHUTDOWN);
else if (cause instanceof IllegalStateException) {
response = Coder.getErrorResponse(this.byteBufAllocator, cause.getMessage());
} else {
if (this.logger.errorEnabled())
this.logger.error("GeodeRedisServer-Unexpected error handler for " + ctx.channel(), cause);
response = Coder.getErrorResponse(this.byteBufAllocator, RedisConstants.SERVER_ERROR_MESSAGE);
}
return response;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (logger.fineEnabled())
logger.fine("GeodeRedisServer-Connection closing with " + ctx.channel().remoteAddress());
ctx.channel().close();
ctx.close();
}
private void executeCommand(ChannelHandlerContext ctx, Command command) throws Exception {
RedisCommandType type = command.getCommandType();
Executor exec = type.getExecutor();
if (isAuthenticated) {
if (type == RedisCommandType.SHUTDOWN) {
this.server.shutdown();
return;
}
if (hasTransaction() && !(exec instanceof TransactionExecutor))
executeWithTransaction(ctx, exec, command);
else
executeWithoutTransaction(exec, command);
if (hasTransaction() && command.getCommandType() != RedisCommandType.MULTI) {
writeToChannel(
Coder.getSimpleStringResponse(this.byteBufAllocator, RedisConstants.COMMAND_QUEUED));
} else {
ByteBuf response = command.getResponse();
writeToChannel(response);
}
} else if (type == RedisCommandType.QUIT) {
exec.executeCommand(command, this);
ByteBuf response = command.getResponse();
writeToChannel(response);
channelInactive(ctx);
} else if (type == RedisCommandType.AUTH) {
exec.executeCommand(command, this);
ByteBuf response = command.getResponse();
writeToChannel(response);
} else {
ByteBuf r = Coder.getNoAuthResponse(this.byteBufAllocator, RedisConstants.ERROR_NOT_AUTH);
writeToChannel(r);
}
}
/**
* Private helper method to execute a command without a transaction, done for special exception
* handling neatness
*
* @param exec Executor to use
* @param command Command to execute
* @throws Exception Throws exception if exception is from within execution and not to be handled
*/
private void executeWithoutTransaction(final Executor exec, Command command) throws Exception {
Exception cause = null;
for (int i = 0; i < MAXIMUM_NUM_RETRIES; i++) {
try {
exec.executeCommand(command, this);
return;
} catch (Exception e) {
cause = e;
if (e instanceof RegionDestroyedException || e instanceof RegionNotFoundException
|| e.getCause() instanceof QueryInvocationTargetException)
Thread.sleep(WAIT_REGION_DSTRYD_MILLIS);
}
}
throw cause;
}
private void executeWithTransaction(ChannelHandlerContext ctx, final Executor exec,
Command command) throws Exception {
CacheTransactionManager txm = cache.getCacheTransactionManager();
TransactionId transactionId = getTransactionID();
txm.resume(transactionId);
try {
exec.executeCommand(command, this);
} catch (UnsupportedOperationInTransactionException e) {
command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.ERROR_UNSUPPORTED_OPERATION_IN_TRANSACTION));
} catch (TransactionException e) {
command.setResponse(Coder.getErrorResponse(this.byteBufAllocator,
RedisConstants.ERROR_TRANSACTION_EXCEPTION));
} catch (Exception e) {
ByteBuf response = getExceptionResponse(ctx, e);
command.setResponse(response);
}
getTransactionQueue().add(command);
transactionId = txm.suspend();
setTransactionID(transactionId);
}
/**
* Get the current transacationId
*
* @return The current transactionId, null if one doesn't exist
*/
public TransactionId getTransactionID() {
return this.transactionID;
}
/**
* Check if client has transaction
*
* @return True if client has transaction, false otherwise
*/
public boolean hasTransaction() {
return transactionID != null;
}
/**
* Setter method for transaction
*
* @param id TransactionId of current transaction for client
*/
public void setTransactionID(TransactionId id) {
this.transactionID = id;
}
/**
* Reset the transaction of client
*/
public void clearTransaction() {
this.transactionID = null;
if (this.transactionQueue != null) {
for (Command c : this.transactionQueue) {
ByteBuf r = c.getResponse();
if (r != null)
r.release();
}
this.transactionQueue.clear();
}
}
/**
* Getter for transaction command queue
*
* @return Command queue
*/
public Queue<Command> getTransactionQueue() {
if (this.transactionQueue == null)
this.transactionQueue = new ConcurrentLinkedQueue<Command>();
return this.transactionQueue;
}
/**
* {@link ByteBuf} allocator for this context. All executors must use this pooled allocator as
* opposed to having unpooled buffers for maximum performance
*
* @return allocator instance
*/
public ByteBufAllocator getByteBufAllocator() {
return this.byteBufAllocator;
}
/**
* Gets the provider of Regions
*
*/
public RegionProvider getRegionProvider() {
return this.regionProvider;
}
/**
* Getter for manager to allow pausing and resuming transactions
*
*/
public CacheTransactionManager getCacheTransactionManager() {
return this.cache.getCacheTransactionManager();
}
/**
* Getter for logger
*
*/
public LogWriter getLogger() {
return this.cache.getLogger();
}
/**
* Get the channel for this context
*
*
* public Channel getChannel() { return this.channel; }
*/
/**
* Get the authentication password, this will be same server wide. It is exposed here as opposed
* to {@link GeodeRedisServer}.
*
*/
public byte[] getAuthPwd() {
return this.authPwd;
}
/**
* Checker if user has authenticated themselves
*
* @return True if no authentication required or authentication complete, false otherwise
*/
public boolean isAuthenticated() {
return this.isAuthenticated;
}
/**
* Lets this context know the authentication is complete
*/
public void setAuthenticationVerified() {
this.isAuthenticated = true;
}
}