blob: 7b8b1c711554d710d351409d83d27098c975733c [file] [log] [blame]
package com.gemstone.gemfire.internal.redis.executor.transactions;
import io.netty.buffer.ByteBuf;
import java.util.Queue;
import com.gemstone.gemfire.cache.CacheTransactionManager;
import com.gemstone.gemfire.cache.CommitConflictException;
import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.internal.redis.Coder;
import com.gemstone.gemfire.internal.redis.Command;
import com.gemstone.gemfire.internal.redis.ExecutionHandlerContext;
import com.gemstone.gemfire.internal.redis.RedisConstants;
public class ExecExecutor extends TransactionExecutor {
@Override
public void executeCommand(Command command, ExecutionHandlerContext context) {
CacheTransactionManager txm = context.getCacheTransactionManager();
if (!context.hasTransaction()) {
command.setResponse(Coder.getNilResponse(context.getByteBufAllocator()));
return;
}
TransactionId transactionId = context.getTransactionID();
txm.resume(transactionId);
boolean hasError = hasError(context.getTransactionQueue());
if (hasError)
txm.rollback();
else {
try {
txm.commit();
} catch (CommitConflictException e) {
command.setResponse(Coder.getErrorResponse(context.getByteBufAllocator(), RedisConstants.ERROR_COMMIT_CONFLICT));
context.clearTransaction();
return;
}
}
ByteBuf response = constructResponseExec(context);
command.setResponse(response);
context.clearTransaction();
}
private ByteBuf constructResponseExec(ExecutionHandlerContext context) {
Queue<Command> cQ = context.getTransactionQueue();
ByteBuf response = context.getByteBufAllocator().buffer();
response.writeByte(Coder.ARRAY_ID);
response.writeBytes(Coder.intToBytes(cQ.size()));
response.writeBytes(Coder.CRLFar);
for (Command c: cQ) {
ByteBuf r = c.getResponse();
response.writeBytes(r);
}
return response;
}
private boolean hasError(Queue<Command> queue) {
for (Command c: queue) {
if (c.hasError())
return true;
}
return false;
}
}