blob: ad0586583c469f85f5fd126b815b6480cb7a6cc1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.*;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.security.Principal;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.Logger;
import org.apache.shiro.subject.Subject;
import org.apache.shiro.util.ThreadState;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.client.internal.AbstractOp;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.ClientHandShake;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.InternalClientMembership;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.command.Default;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.AuthorizeRequestPP;
import org.apache.geode.internal.security.IntegratedSecurityService;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.util.Breadcrumbs;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
/**
* Provides an implementation for the server socket end of the hierarchical cache connection. Each
* server connection runs in its own thread to maximize concurrency and improve response times to
* edge requests
*
* @since GemFire 2.0.2
*/
public class ServerConnection implements Runnable {
private static final Logger logger = LogService.getLogger();
/**
* This is a buffer that we add to client readTimeout value before we cleanup the connection. This
* buffer time helps prevent EOF in the client instead of SocketTimeout
*/
private static final int TIMEOUT_BUFFER_FOR_CONNECTION_CLEANUP_MS = 5000;
public static final String DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME =
"geode.disallow-internal-messages-without-credentials";
/**
* When true requires some formerly credential-less messages to carry credentials. See GEODE-3249
* and ServerConnection.isInternalMessage()
*/
public static boolean allowInternalMessagesWithoutCredentials =
!(Boolean.getBoolean(DISALLOW_INTERNAL_MESSAGES_WITHOUT_CREDENTIALS_NAME));
private Map commands;
private SecurityService securityService = IntegratedSecurityService.getSecurityService();
final protected CacheServerStats stats;
// private static boolean useDataStream =
// System.getProperty("hct.useDataStream", "false").equals("true");
// The key is the size of each ByteBuffer. The value is a queue of byte buffers all of that size.
private static final ConcurrentHashMap<Integer, LinkedBlockingQueue<ByteBuffer>> commBufferMap =
new ConcurrentHashMap<>(4, 0.75f, 1);
public static ByteBuffer allocateCommBuffer(int size, Socket sock) {
// I expect that size will almost always be the same value
if (sock.getChannel() == null) {
// The socket this commBuffer will be used for is old IO (it has no channel).
// So the commBuffer should be heap based.
return ByteBuffer.allocate(size);
}
LinkedBlockingQueue<ByteBuffer> q = commBufferMap.get(size);
ByteBuffer result = null;
if (q != null) {
result = q.poll();
}
if (result == null) {
result = ByteBuffer.allocateDirect(size);
} else {
result.position(0);
result.limit(result.capacity());
}
return result;
}
public static void releaseCommBuffer(ByteBuffer bb) {
if (bb != null && bb.isDirect()) {
LinkedBlockingQueue<ByteBuffer> q = commBufferMap.get(bb.capacity());
if (q == null) {
q = new LinkedBlockingQueue<>();
LinkedBlockingQueue<ByteBuffer> oldQ = commBufferMap.putIfAbsent(bb.capacity(), q);
if (oldQ != null) {
q = oldQ;
}
}
q.offer(bb);
}
}
public static void emptyCommBufferPool() {
for (LinkedBlockingQueue<ByteBuffer> q : commBufferMap.values()) {
q.clear();
}
}
private Socket theSocket;
// private InputStream in = null;
// private OutputStream out = null;
private ByteBuffer commBuffer;
private final CachedRegionHelper crHelper;
private String name = null;
// IMPORTANT: if new messages are added change setHandshake to initialize them
// to the correct Version for serializing to the client
private Message requestMsg = new Message(2, Version.CURRENT);
private Message replyMsg = new Message(1, Version.CURRENT);
private Message responseMsg = new Message(1, Version.CURRENT);
private Message errorMsg = new Message(1, Version.CURRENT);
// IMPORTANT: if new messages are added change setHandshake to initialize them
// to the correct Version for serializing to the client
private ChunkedMessage queryResponseMsg = new ChunkedMessage(2, Version.CURRENT);
private ChunkedMessage chunkedResponseMsg = new ChunkedMessage(1, Version.CURRENT);
private ChunkedMessage executeFunctionResponseMsg = new ChunkedMessage(1, Version.CURRENT);
private ChunkedMessage registerInterestResponseMsg = new ChunkedMessage(1, Version.CURRENT);
private ChunkedMessage keySetResponseMsg = new ChunkedMessage(1, Version.CURRENT);
private final InternalLogWriter logWriter;
private final InternalLogWriter securityLogWriter;
final private AcceptorImpl acceptor;
private Thread owner;
/**
* Handshake reference uniquely identifying a client
*/
private ClientHandShake handshake;
private int handShakeTimeout;
private final Object handShakeMonitor = new Object();
/*
* This timeout is request specific which come with message itself Otherwise, timeout which comes
* during handshake is used.
*/
private volatile int requestSpecificTimeout = -1;
/** Tracks the id of the most recent batch to which a reply has been sent */
private int latestBatchIdReplied = -1;
/*
* Uniquely identifying the client's Distributed System
*
*
* private String membershipId;
*
*
* Uniquely identifying the client's ConnectionProxy object
*
*
* private String proxyID ;
*/
ClientProxyMembershipID proxyId;
byte[] memberIdByteArray;
/**
* Authorize client requests using this object. This is set when each operation on this connection
* is authorized in pre-operation phase.
*/
private AuthorizeRequest authzRequest;
/**
* Authorize client requests using this object. This is set when each operation on this connection
* is authorized in post-operation phase.
*/
private AuthorizeRequestPP postAuthzRequest;
/**
* The communication mode for this <code>ServerConnection</code>. Valid types include
* 'client-server', 'gateway-gateway' and 'monitor-server'.
*/
private final byte communicationMode;
private final String communicationModeStr;
private long processingMessageStartTime = -1;
private Object processingMessageLock = new Object();
private static ConcurrentHashMap<ClientProxyMembershipID, ClientUserAuths> proxyIdVsClientUserAuths =
new ConcurrentHashMap<ClientProxyMembershipID, ClientUserAuths>();
private ClientUserAuths clientUserAuths;
// this is constant(server and client) for first user request, after that it is random
// this also need to send in handshake
private long connectionId = Connection.DEFAULT_CONNECTION_ID;
private Random randomConnectionIdGen = null;
private Part securePart = null;
private Principal principal;
private MessageIdExtractor messageIdExtractor = new MessageIdExtractor();
/**
* A debug flag used for testing Backward compatibility
*/
public static boolean TEST_VERSION_AFTER_HANDSHAKE_FLAG = false;
public static short testVersionAfterHandshake = 4;
/**
* Creates a new <code>ServerConnection</code> that processes messages received from an edge
* client over a given <code>Socket</code>.
*/
public ServerConnection(Socket s, Cache c, CachedRegionHelper helper, CacheServerStats stats,
int hsTimeout, int socketBufferSize, String communicationModeStr, byte communicationMode,
Acceptor acceptor) {
StringBuffer buffer = new StringBuffer(100);
if (((AcceptorImpl) acceptor).isGatewayReceiver()) {
buffer.append("GatewayReceiver connection from [");
} else {
buffer.append("Server connection from [");
}
buffer.append(communicationModeStr).append(" host address=")
.append(s.getInetAddress().getHostAddress()).append("; ").append(communicationModeStr)
.append(" port=").append(s.getPort()).append("]");
this.name = buffer.toString();
this.stats = stats;
this.acceptor = (AcceptorImpl) acceptor;
this.crHelper = helper;
this.logWriter = (InternalLogWriter) c.getLoggerI18n();
this.securityLogWriter = (InternalLogWriter) c.getSecurityLoggerI18n();
this.communicationModeStr = communicationModeStr;
this.communicationMode = communicationMode;
this.principal = null;
this.authzRequest = null;
this.postAuthzRequest = null;
this.randomConnectionIdGen = new Random(this.hashCode());
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
// requestMsg.setUseDataStream(useDataStream);
// replyMsg.setUseDataStream(useDataStream);
// responseMsg.setUseDataStream(useDataStream);
// errorMsg.setUseDataStream(useDataStream);
initStreams(s, socketBufferSize, stats);
if (isDebugEnabled) {
logger.debug(
"{}: Accepted client connection from {}[client host name={}; client host address={}; client port={}]",
getName(), s.getInetAddress().getCanonicalHostName(),
s.getInetAddress().getHostAddress(), s.getPort());
}
this.handShakeTimeout = hsTimeout;
} catch (Exception e) {
if (isDebugEnabled) {
logger.debug("While creating server connection", e);
}
}
}
public AcceptorImpl getAcceptor() {
return this.acceptor;
}
static private final ThreadLocal<Byte> executeFunctionOnLocalNodeOnly = new ThreadLocal<Byte>() {
@Override
protected Byte initialValue() {
return 0x00;
}
};
static public void executeFunctionOnLocalNodeOnly(Byte value) {
byte b = value.byteValue();
executeFunctionOnLocalNodeOnly.set(b);
}
static public Byte isExecuteFunctionOnLocalNodeOnly() {
return executeFunctionOnLocalNodeOnly.get();
}
private boolean verifyClientConnection() {
synchronized (this.handShakeMonitor) {
if (this.handshake == null) {
// synchronized (getCleanupTable()) {
boolean readHandShake = ServerHandShakeProcessor.readHandShake(this);
if (readHandShake) {
if (this.handshake.isOK()) {
try {
return processHandShake();
} catch (CancelException e) {
if (!crHelper.isShutdown()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ServerConnection_0_UNEXPECTED_CANCELLATION, getName()), e);
}
cleanup();
return false;
}
} else {
this.crHelper.checkCancelInProgress(null); // bug 37113?
logger.warn(LocalizedMessage.create(
LocalizedStrings.ServerConnection_0_RECEIVED_UNKNOWN_HANDSHAKE_REPLY_CODE_1,
new Object[] {this.name, new Byte(this.handshake.getCode())}));
refuseHandshake(LocalizedStrings.ServerConnection_RECEIVED_UNKNOWN_HANDSHAKE_REPLY_CODE
.toLocalizedString(), ServerHandShakeProcessor.REPLY_INVALID);
return false;
}
} else {
this.stats.incFailedConnectionAttempts();
cleanup();
return false;
}
// }
}
}
return true;
}
protected Map getCommands() {
return this.commands;
}
protected Socket getSocket() {
return this.theSocket;
}
protected int getHandShakeTimeout() {
return this.handShakeTimeout;
}
protected DistributedSystem getDistributedSystem() {
return getCache().getDistributedSystem();
}
public InternalCache getCache() {
return this.crHelper.getCache();
}
public ClientHandShake getHandshake() {
return this.handshake;
}
public void setHandshake(ClientHandShake handshake) {
this.handshake = handshake;
Version v = handshake.getVersion();
this.replyMsg.setVersion(v);
this.requestMsg.setVersion(v);
this.responseMsg.setVersion(v);
this.errorMsg.setVersion(v);
this.queryResponseMsg.setVersion(v);
this.chunkedResponseMsg.setVersion(v);
this.executeFunctionResponseMsg.setVersion(v);
this.registerInterestResponseMsg.setVersion(v);
this.keySetResponseMsg.setVersion(v);
}
public void setRequestMsg(Message requestMsg) {
this.requestMsg = requestMsg;
}
public Version getClientVersion() {
return this.handshake.getVersion();
}
protected void setProxyId(ClientProxyMembershipID proxyId) {
this.proxyId = proxyId;
this.memberIdByteArray = EventID.getMembershipId(proxyId);
// LogWriterI18n log = InternalDistributedSystem.getLoggerI18n();
// byte[] oldIdArray = proxyId.getMembershipByteArray();
// log.warning(LocalizedStrings.DEBUG, "Size comparison for " + proxyId.getDistributedMember()
// + " old=" + oldIdArray.length + " new=" + memberIdByteArray.length
// + " diff=" + (oldIdArray.length - memberIdByteArray.length));
this.name = "Server connection from [" + proxyId + "; port=" + this.theSocket.getPort() + "]";
}
protected void setPrincipal(Principal principal) {
this.principal = principal;
}
// hitesh:this is for backward compability
public long setUserAuthorizeAndPostAuthorizeRequest(AuthorizeRequest authzRequest,
AuthorizeRequestPP postAuthzRequest) throws IOException {
UserAuthAttributes userAuthAttr = new UserAuthAttributes(authzRequest, postAuthzRequest);
if (this.clientUserAuths == null) {
this.initializeClientUserAuths();
}
try {
return this.clientUserAuths.putUserAuth(userAuthAttr);
} catch (NullPointerException npe) {
if (this.isTerminated()) {
// Bug #52023.
throw new IOException("Server connection is terminated.");
}
throw npe;
}
}
public InternalLogWriter getSecurityLogWriter() {
return this.securityLogWriter;
}
private boolean incedCleanupTableRef = false;
private boolean incedCleanupProxyIdTableRef = false;
private final Object chmLock = new Object();
private boolean chmRegistered = false;
private Map getCleanupTable() {
return acceptor.getClientHealthMonitor().getCleanupTable();
}
private Map getCleanupProxyIdTable() {
return acceptor.getClientHealthMonitor().getCleanupProxyIdTable();
}
private ClientHealthMonitor getClientHealthMonitor() {
return acceptor.getClientHealthMonitor();
}
private boolean processHandShake() {
boolean result = false;
boolean clientJoined = false;
boolean registerClient = false;
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
synchronized (getCleanupTable()) {
Counter numRefs = (Counter) getCleanupTable().get(this.handshake);
byte epType = (byte) 0;
int qSize = 0;
if (this.proxyId.isDurable()) {
if (isDebugEnabled) {
logger.debug("looking if the Proxy existed for this durable client or not :{}",
this.proxyId);
}
CacheClientProxy proxy =
getAcceptor().getCacheClientNotifier().getClientProxy(this.proxyId);
if (proxy != null && proxy.waitRemoval()) {
proxy = getAcceptor().getCacheClientNotifier().getClientProxy(this.proxyId);
}
if (proxy != null) {
if (isDebugEnabled) {
logger.debug("Proxy existed for this durable client :{} and proxy : {}", this.proxyId,
proxy);
}
if (proxy.isPrimary()) {
epType = (byte) 2;
qSize = proxy.getQueueSize();
} else {
epType = (byte) 1;
qSize = proxy.getQueueSize();
}
}
// Bug Fix for 37986
if (numRefs == null) {
// Check whether this is a durable client first. A durable client with
// the same id is not allowed. In this case, reject the client.
if (proxy != null && !proxy.isPaused()) {
// The handshake refusal message must be smaller than 127 bytes.
String handshakeRefusalMessage =
LocalizedStrings.ServerConnection_DUPLICATE_DURABLE_CLIENTID_0
.toLocalizedString(proxyId.getDurableId());
logger.warn(LocalizedMessage.create(LocalizedStrings.TWO_ARG_COLON,
new Object[] {this.name, handshakeRefusalMessage}));
refuseHandshake(handshakeRefusalMessage,
HandShake.REPLY_EXCEPTION_DUPLICATE_DURABLE_CLIENT);
return result;
}
}
}
if (numRefs != null) {
if (acceptHandShake(epType, qSize)) {
numRefs.incr();
this.incedCleanupTableRef = true;
result = true;
}
return result;
} else {
if (acceptHandShake(epType, qSize)) {
clientJoined = true;
numRefs = new Counter();
getCleanupTable().put(this.handshake, numRefs);
numRefs.incr();
this.incedCleanupTableRef = true;
this.stats.incCurrentClients();
result = true;
}
return result;
}
} // sync
} // try
finally {
if (isTerminated() || result == false) {
return false;
}
synchronized (getCleanupProxyIdTable()) {
Counter numRefs = (Counter) getCleanupProxyIdTable().get(this.proxyId);
if (numRefs != null) {
numRefs.incr();
} else {
registerClient = true;
numRefs = new Counter();
numRefs.incr();
getCleanupProxyIdTable().put(this.proxyId, numRefs);
InternalDistributedMember idm =
(InternalDistributedMember) this.proxyId.getDistributedMember();
}
this.incedCleanupProxyIdTableRef = true;
}
if (isDebugEnabled) {
logger.debug("{}registering client {}", (registerClient ? "" : "not "), proxyId);
}
this.crHelper.checkCancelInProgress(null);
if (clientJoined && isFiringMembershipEvents()) {
// This is a new client. Notify bridge membership and heartbeat monitor.
InternalClientMembership.notifyClientJoined(this.proxyId.getDistributedMember());
}
ClientHealthMonitor chm = this.acceptor.getClientHealthMonitor();
synchronized (this.chmLock) {
this.chmRegistered = true;
}
if (registerClient) {
// hitesh: it will add client
chm.registerClient(this.proxyId);
}
// hitesh:it will add client connection in set
chm.addConnection(this.proxyId, this);
this.acceptor.getConnectionListener().connectionOpened(registerClient, communicationMode);
// Hitesh: add user creds in map for single user case.
} // finally
}
private boolean isFiringMembershipEvents() {
return this.acceptor.isRunning()
&& !(this.acceptor.getCachedRegionHelper().getCache()).isClosed()
&& !acceptor.getCachedRegionHelper().getCache().getCancelCriterion().isCancelInProgress();
}
protected void refuseHandshake(String msg, byte exception) {
try {
ServerHandShakeProcessor.refuse(this.theSocket.getOutputStream(), msg, exception);
} catch (IOException ignore) {
} finally {
this.stats.incFailedConnectionAttempts();
cleanup();
}
}
private boolean acceptHandShake(byte epType, int qSize) {
try {
this.handshake.accept(theSocket.getOutputStream(), theSocket.getInputStream(), epType, qSize,
this.communicationMode, this.principal);
} catch (IOException ioe) {
if (!crHelper.isShutdown() && !isTerminated()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ServerConnection_0_HANDSHAKE_ACCEPT_FAILED_ON_SOCKET_1_2,
new Object[] {this.name, this.theSocket, ioe}));
}
cleanup();
return false;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Accepted handshake", this.name);
}
if (this.communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE) {
this.stats.incCurrentQueueConnections();
} else {
this.stats.incCurrentClientConnections();
}
return true;
}
public void setCq(String cqName, boolean isDurable) throws Exception {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (this.requestMsg.isSecureMode()) {
if (isDebugEnabled) {
logger.debug("setCq() security header found registering CQname = {}", cqName);
}
try {
byte[] secureBytes = this.requestMsg.getSecureBytes();
secureBytes = ((HandShake) this.handshake).decryptBytes(secureBytes);
AuthIds aIds = new AuthIds(secureBytes);
long uniqueId = aIds.getUniqueId();
CacheClientProxy proxy =
getAcceptor().getCacheClientNotifier().getClientProxy(this.proxyId);
if (proxy != null) {
proxy.setCQVsUserAuth(cqName, uniqueId, isDurable);
}
} catch (Exception ex) {
if (isDebugEnabled) {
logger.debug("While setting cq got exception ", ex);
}
throw ex;
}
} else {
if (isDebugEnabled) {
logger.debug("setCq() security header is not found ");
}
}
}
public void removeCq(String cqName, boolean isDurable) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (this.requestMsg.isSecureMode()) {
if (isDebugEnabled) {
logger.debug("removeCq() security header found registering CQname = {}", cqName);
}
try {
this.clientUserAuths.removeUserAuthAttributesForCq(cqName, isDurable);
} catch (Exception ex) {
if (isDebugEnabled) {
logger.debug("While setting cq got exception ", ex);
}
}
} else {
if (isDebugEnabled) {
logger.debug("removeCq() security header is not found");
}
}
}
static class Counter {
int cnt;
void incr() {
++cnt;
}
int decr() {
return --cnt;
}
int getCnt() {
return cnt;
}
}
// public void setUserAuthAttributes(ClientProxyMembershipID proxyId, AuthorizeRequest
// authzRequest, AuthorizeRequestPP postAuthzRequest) {
// UserAuthAttributes uaa = new UserAuthAttributes(authzRequest, postAuthzRequest);
// }
/**
* Set to false once handshake has been done
*/
private boolean doHandshake = true;
private boolean clientDisconnectedCleanly = false;
private Throwable clientDisconnectedException;
private int failureCount = 0;
private boolean processMessages = true;
private void doHandshake() {
// hitesh:to create new connection handshake
if (verifyClientConnection()) {
// Initialize the commands after the handshake so that the version
// can be used.
initializeCommands();
// its initialized in verifyClientConnection call
if (getCommunicationMode() != Acceptor.GATEWAY_TO_GATEWAY)
initializeClientUserAuths();
}
if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
"Found different version after handshake");
TEST_VERSION_AFTER_HANDSHAKE_FLAG = false;
}
}
private void doNormalMsg() {
Message msg = null;
msg = BaseCommand.readRequest(this);
ThreadState threadState = null;
try {
if (msg != null) {
// Since this thread is not interrupted when the cache server is shutdown, test again after
// a message has been read. This is a bit of a hack. I think this thread should be
// interrupted, but currently AcceptorImpl doesn't keep track of the threads that it
// launches.
if (!this.processMessages || (crHelper.isShutdown())) {
if (logger.isDebugEnabled()) {
logger.debug("{} ignoring message of type {} from client {} due to shutdown.",
getName(), MessageType.getString(msg.getMessageType()), this.proxyId);
}
return;
}
if (msg.getMessageType() != MessageType.PING) {
// check for invalid number of message parts
if (msg.getNumberOfParts() <= 0) {
failureCount++;
if (failureCount > 3) {
this.processMessages = false;
return;
} else {
return;
}
}
}
if (logger.isTraceEnabled()) {
logger.trace("{} received {} with txid {}", getName(),
MessageType.getString(msg.getMessageType()), msg.getTransactionId());
if (msg.getTransactionId() < -1) { // TODO: why is this happening?
msg.setTransactionId(-1);
}
}
if (msg.getMessageType() != MessageType.PING) {
// we have a real message (non-ping),
// so let's call receivedPing to let the CHM know client is busy
acceptor.getClientHealthMonitor().receivedPing(this.proxyId);
}
Command command = getCommand(Integer.valueOf(msg.getMessageType()));
if (command == null) {
command = Default.getCommand();
}
// if a subject exists for this uniqueId, binds the subject to this thread so that we can do
// authorization later
if (AcceptorImpl.isIntegratedSecurity()
&& !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)
&& this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY) {
long uniqueId = getUniqueId();
Subject subject = this.clientUserAuths.getSubject(uniqueId);
if (subject != null) {
threadState = securityService.bindSubject(subject);
}
}
command.execute(msg, this);
}
} finally {
// Keep track of the fact that a message is no longer being
// processed.
setNotProcessingMessage();
clearRequestMsg();
if (threadState != null) {
threadState.clear();
}
}
}
private final Object terminationLock = new Object();
private boolean terminated = false;
public boolean isTerminated() {
synchronized (this.terminationLock) {
return this.terminated;
}
}
private void cleanClientAuths() {
if (this.clientUserAuths != null) {
this.clientUserAuths.cleanup(false);
}
}
// package access allowed so AcceptorImpl can call
void handleTermination() {
if (this.crHelper.isShutdown()) {
setClientDisconnectCleanly();
}
handleTermination(false);
}
void handleTermination(boolean timedOut) {
boolean cleanupStats = false;
synchronized (this.terminationLock) {
if (this.terminated) {
return;
}
this.terminated = true;
}
boolean clientDeparted = false;
boolean unregisterClient = false;
setNotProcessingMessage();
synchronized (getCleanupTable()) {
if (this.incedCleanupTableRef) {
this.incedCleanupTableRef = false;
cleanupStats = true;
Counter numRefs = (Counter) getCleanupTable().get(this.handshake);
if (numRefs != null) {
numRefs.decr();
if (numRefs.getCnt() <= 0) {
clientDeparted = true;
getCleanupTable().remove(this.handshake);
this.stats.decCurrentClients();
}
}
if (this.communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE) {
this.stats.decCurrentQueueConnections();
} else {
this.stats.decCurrentClientConnections();
}
}
}
synchronized (getCleanupProxyIdTable()) {
if (this.incedCleanupProxyIdTableRef) {
this.incedCleanupProxyIdTableRef = false;
Counter numRefs = (Counter) getCleanupProxyIdTable().get(this.proxyId);
if (numRefs != null) {
numRefs.decr();
if (numRefs.getCnt() <= 0) {
unregisterClient = true;
getCleanupProxyIdTable().remove(this.proxyId);
// here we can remove entry multiuser map for client
proxyIdVsClientUserAuths.remove(this.proxyId);
InternalDistributedMember idm =
(InternalDistributedMember) this.proxyId.getDistributedMember();
}
}
}
}
cleanup(timedOut);
if (getAcceptor().isRunning()) {
// If the client has departed notify bridge membership and unregister it from
// the heartbeat monitor; other wise just remove the connection.
if (clientDeparted && isFiringMembershipEvents()) {
if (this.clientDisconnectedCleanly && !forceClientCrashEvent) {
InternalClientMembership.notifyClientLeft(proxyId.getDistributedMember());
} else {
InternalClientMembership.notifyClientCrashed(this.proxyId.getDistributedMember());
}
// The client has departed. Remove this last connection and unregister it.
}
}
{ // moved out of above if to fix bug 36751
boolean needsUnregister = false;
synchronized (this.chmLock) {
if (this.chmRegistered) {
needsUnregister = true;
this.chmRegistered = false;
}
}
if (unregisterClient)// last serverconnection call all close on auth objects
cleanClientAuths();
this.clientUserAuths = null;
if (needsUnregister) {
this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
if (unregisterClient) {
this.acceptor.getClientHealthMonitor().unregisterClient(this.proxyId, getAcceptor(),
this.clientDisconnectedCleanly, this.clientDisconnectedException);
}
}
}
if (cleanupStats) {
this.acceptor.getConnectionListener().connectionClosed(clientDeparted, communicationMode);
}
}
private void doOneMessage() {
if (this.doHandshake) {
doHandshake();
this.doHandshake = false;
} else {
this.resetTransientData();
doNormalMsg();
}
}
private void initializeClientUserAuths() {
this.clientUserAuths = getClientUserAuths(this.proxyId);
}
static ClientUserAuths getClientUserAuths(ClientProxyMembershipID proxyId) {
ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
if (retCua == null)
return cua;
return retCua;
}
private void initializeCommands() {
// The commands are cached here, but are just referencing the ones
// stored in the CommandInitializer
this.commands = CommandInitializer.getCommands(this);
}
private Command getCommand(Integer messageType) {
Command cc = (Command) this.commands.get(messageType);
return cc;
}
public boolean removeUserAuth(Message msg, boolean keepalive) {
try {
byte[] secureBytes = msg.getSecureBytes();
secureBytes = ((HandShake) this.handshake).decryptBytes(secureBytes);
// need to decrypt it first then get connectionid
AuthIds aIds = new AuthIds(secureBytes);
long connId = aIds.getConnectionId();
if (connId != this.connectionId) {
throw new AuthenticationFailedException("Authentication failed");
}
try {
// first try integrated security
boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
// if not successfull, try the old way
if (!removed)
removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
return removed;
} catch (NullPointerException npe) {
// Bug #52023.
logger.debug("Exception {}", npe);
return false;
}
} catch (Exception ex) {
throw new AuthenticationFailedException("Authentication failed", ex);
}
}
public byte[] setCredentials(Message msg) throws Exception {
try {
// need to get connection id from secure part of message, before that need to insure
// encryption of id
// need to check here, whether it matches with serverConnection id or not
// need to decrpt bytes if its in DH mode
// need to get properties of credentials(need to remove extra stuff if something is there from
// client)
// need to generate unique-id for client
// need to send back in response with encrption
if (!AcceptorImpl.isAuthenticationRequired() && msg.isSecureMode()) {
// TODO (ashetkar)
/*
* This means that client and server VMs have different security settings. The server does
* not have any security settings specified while client has.
*
* Here, should we just ignore this and send the dummy security part (connectionId, userId)
* in the response (in this case, client needs to know that it is not expected to read any
* security part in any of the server response messages) or just throw an exception
* indicating bad configuration?
*/
// This is a CREDENTIALS_NORMAL case.;
return new byte[0];
}
if (!msg.isSecureMode()) {
throw new AuthenticationFailedException("Authentication failed");
}
byte[] secureBytes = msg.getSecureBytes();
secureBytes = ((HandShake) this.handshake).decryptBytes(secureBytes);
// need to decrypt it first then get connectionid
AuthIds aIds = new AuthIds(secureBytes);
long connId = aIds.getConnectionId();
if (connId != this.connectionId) {
throw new AuthenticationFailedException("Authentication failed");
}
byte[] credBytes = msg.getPart(0).getSerializedForm();
credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
ByteArrayInputStream bis = new ByteArrayInputStream(credBytes);
DataInputStream dinp = new DataInputStream(bis);
Properties credentials = DataSerializer.readProperties(dinp);
// When here, security is enfored on server, if login returns a subject, then it's the newly
// integrated security, otherwise, do it the old way.
long uniqueId;
DistributedSystem system = this.getDistributedSystem();
String methodName = system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
Object principal = HandShake.verifyCredentials(methodName, credentials,
system.getSecurityProperties(), (InternalLogWriter) system.getLogWriter(),
(InternalLogWriter) system.getSecurityLogWriter(), this.proxyId.getDistributedMember());
if (principal instanceof Subject) {
Subject subject = (Subject) principal;
uniqueId = this.clientUserAuths.putSubject(subject);
logger.info(this.clientUserAuths);
} else {
// this sets principal in map as well....
uniqueId = ServerHandShakeProcessor.getUniqueId(this, (Principal) principal);
}
// create secure part which will be send in respones
return encryptId(uniqueId, this);
} catch (AuthenticationFailedException afe) {
throw afe;
} catch (AuthenticationRequiredException are) {
throw are;
} catch (Exception e) {
throw new AuthenticationFailedException("REPLY_REFUSED", e);
}
}
private void setSecurityPart() {
try {
this.connectionId = randomConnectionIdGen.nextLong();
this.securePart = new Part();
byte[] id = encryptId(this.connectionId, this);
this.securePart.setPartState(id, false);
} catch (Exception ex) {
logger.warn(LocalizedMessage
.create(LocalizedStrings.ServerConnection_SERVER_FAILED_TO_ENCRYPT_DATA_0, ex));
throw new GemFireSecurityException("Server failed to encrypt response message.");
}
}
/**
* MessageType of the messages (typically internal commands) which do not need to participate in
* security should be added in the following if block.
*
* @return Part
* @see AbstractOp#processSecureBytes(Connection, Message)
* @see AbstractOp#needsUserId()
* @see AbstractOp#sendMessage(Connection)
*/
public Part updateAndGetSecurityPart() {
// need to take care all message types here
if (AcceptorImpl.isAuthenticationRequired()
&& this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
&& (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
&& (!this.requestMsg.getAndResetIsMetaRegion())
&& !isInternalMessage(this.requestMsg, allowInternalMessagesWithoutCredentials)) {
setSecurityPart();
return this.securePart;
} else {
if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) {
logger.debug(
"ServerConnection.updateAndGetSecurityPart() not adding security part for msg type {}",
MessageType.getString(this.requestMsg.messageType));
}
}
return null;
}
public boolean isInternalMessage(Message message, boolean allowOldInternalMessages) {
int messageType = message.getMessageType();
boolean isInternalMessage = messageType == MessageType.PING
|| messageType == MessageType.REQUEST_EVENT_VALUE || messageType == MessageType.MAKE_PRIMARY
|| messageType == MessageType.REMOVE_USER_AUTH || messageType == MessageType.CLIENT_READY
|| messageType == MessageType.SIZE || messageType == MessageType.TX_FAILOVER
|| messageType == MessageType.TX_SYNCHRONIZATION || messageType == MessageType.COMMIT
|| messageType == MessageType.ROLLBACK || messageType == MessageType.CLOSE_CONNECTION
|| messageType == MessageType.INVALID || messageType == MessageType.PERIODIC_ACK
|| messageType == MessageType.GET_CLIENT_PR_METADATA
|| messageType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES;
// we allow older clients to not send credentials for a handful of messages
// if and only if a system property is set. This allows a rolling upgrade
// to be performed.
if (!isInternalMessage && allowOldInternalMessages) {
isInternalMessage = messageType == MessageType.GETCQSTATS_MSG_TYPE
|| messageType == MessageType.MONITORCQ_MSG_TYPE
|| messageType == MessageType.REGISTER_DATASERIALIZERS
|| messageType == MessageType.REGISTER_INSTANTIATORS
|| messageType == MessageType.ADD_PDX_TYPE
|| messageType == MessageType.GET_PDX_ID_FOR_TYPE
|| messageType == MessageType.GET_PDX_TYPE_BY_ID
|| messageType == MessageType.GET_FUNCTION_ATTRIBUTES
|| messageType == MessageType.ADD_PDX_ENUM
|| messageType == MessageType.GET_PDX_ID_FOR_ENUM
|| messageType == MessageType.GET_PDX_ENUM_BY_ID
|| messageType == MessageType.GET_PDX_TYPES || messageType == MessageType.GET_PDX_ENUMS;
}
return isInternalMessage;
}
public void run() {
setOwner();
if (getAcceptor().isSelector()) {
boolean finishedMsg = false;
try {
this.stats.decThreadQueueSize();
if (!isTerminated()) {
Message.setTLCommBuffer(getAcceptor().takeCommBuffer());
doOneMessage();
if (this.processMessages && !(this.crHelper.isShutdown())) {
registerWithSelector(); // finished msg so reregister
finishedMsg = true;
}
}
} catch (java.nio.channels.ClosedChannelException ignore) {
// ok shutting down
} catch (CancelException e) {
// ok shutting down
} catch (IOException ex) {
logger.warn(
LocalizedMessage.create(LocalizedStrings.ServerConnection_0__UNEXPECTED_EXCEPTION, ex));
setClientDisconnectedException(ex);
} finally {
getAcceptor().releaseCommBuffer(Message.setTLCommBuffer(null));
// DistributedSystem.releaseThreadsSockets();
unsetOwner();
setNotProcessingMessage();
// unset request specific timeout
this.unsetRequestSpecificTimeout();
if (!finishedMsg) {
try {
handleTermination();
} catch (CancelException e) {
// ignore
}
}
}
} else {
try {
while (this.processMessages && !(this.crHelper.isShutdown())) {
try {
doOneMessage();
} catch (CancelException e) {
// allow finally block to handle termination
} finally {
this.unsetRequestSpecificTimeout();
Breadcrumbs.clearBreadcrumb();
}
}
} finally {
try {
this.unsetRequestSpecificTimeout();
handleTermination();
DistributedSystem.releaseThreadsSockets();
} catch (CancelException e) {
// ignore
}
}
}
}
/**
* If registered with a selector then this will be the key we are registered with.
*/
// private SelectionKey sKey = null;
/**
* Register this connection with the given selector for read events. Note that switch the channel
* to non-blocking so it can be in a selector.
*/
public void registerWithSelector() throws IOException {
// logger.info("DEBUG: registerWithSelector " + this);
getSelectableChannel().configureBlocking(false);
getAcceptor().registerSC(this);
}
public SelectableChannel getSelectableChannel() {
return this.theSocket.getChannel();
}
public void registerWithSelector2(Selector s) throws IOException {
/* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
}
/**
* Switch this guy to blocking mode so we can use oldIO to read and write msgs.
*/
public void makeBlocking() throws IOException {
// logger.info("DEBUG: makeBlocking " + this);
// if (this.sKey != null) {
// this.sKey = null;
// }
SelectableChannel c = this.theSocket.getChannel();
c.configureBlocking(true);
}
private static boolean forceClientCrashEvent = false;
public static void setForceClientCrashEvent(boolean value) {
forceClientCrashEvent = value;
}
/**
*
* @return String representing the DistributedSystemMembership of the Client VM
*/
public String getMembershipID() {
return this.proxyId.getDSMembership();
}
public int getSocketPort() {
return theSocket.getPort();
}
public String getSocketHost() {
return theSocket.getInetAddress().getHostAddress();
}
// private DistributedMember getClientDistributedMember() {
// return this.proxyId.getDistributedMember();
// }
protected byte getCommunicationMode() {
return this.communicationMode;
}
protected String getCommunicationModeString() {
return this.communicationModeStr;
}
protected InetAddress getSocketAddress() {
return theSocket.getInetAddress();
}
public void setRequestSpecificTimeout(int requestSpecificTimeout) {
this.requestSpecificTimeout = requestSpecificTimeout;
}
private void unsetRequestSpecificTimeout() {
this.requestSpecificTimeout = -1;
}
int getClientReadTimeout() {
if (this.requestSpecificTimeout == -1)
return this.handshake.getClientReadTimeout();
else
return this.requestSpecificTimeout;
}
protected boolean isProcessingMessage() {
if (isTerminated()) {
return false;
}
synchronized (this.processingMessageLock) {
return basicIsProcessingMessage();
}
}
private boolean basicIsProcessingMessage() {
return this.processingMessageStartTime != -1;
}
void setProcessingMessage() {
synchronized (this.processingMessageLock) {
// go ahead and reset it if it is already set
this.processingMessageStartTime = System.currentTimeMillis();
}
}
void updateProcessingMessage() {
synchronized (this.processingMessageLock) {
// only update it if it was already set by setProcessingMessage
if (this.processingMessageStartTime != -1) {
this.processingMessageStartTime = System.currentTimeMillis();
}
}
}
protected void setNotProcessingMessage() {
synchronized (this.processingMessageLock) {
this.processingMessageStartTime = -1;
}
}
long getCurrentMessageProcessingTime() {
long result;
synchronized (this.processingMessageLock) {
result = this.processingMessageStartTime;
}
if (result != -1) {
result = System.currentTimeMillis() - result;
}
return result;
}
protected boolean hasBeenTimedOutOnClient() {
int timeout = getClientReadTimeout();
if (timeout > 0) { // 0 means no timeout
timeout = timeout + TIMEOUT_BUFFER_FOR_CONNECTION_CLEANUP_MS;
/*
* This is a buffer that we add to client readTimeout value before we cleanup the connection.
* This buffer time helps prevent EOF in the client instead of SocketTimeout
*/
synchronized (this.processingMessageLock) {
// If a message is currently being processed and it has been
// being processed for more than the client read timeout,
// then return true
if (getCurrentMessageProcessingTime() > timeout) {
return true;
}
}
}
return false;
}
public String getSocketString() {
try {
StringBuffer buffer = new StringBuffer(50).append(theSocket.getInetAddress()).append(':')
.append(theSocket.getPort()).append(" timeout: ").append(theSocket.getSoTimeout());
return buffer.toString();
} catch (Exception e) {
return LocalizedStrings.ServerConnection_ERROR_IN_GETSOCKETSTRING_0
.toLocalizedString(e.getLocalizedMessage());
}
}
void clearRequestMsg() {
requestMsg.clear();
}
public void incrementLatestBatchIdReplied(int justProcessed) {
// not synchronized because it only has a single caller
if (justProcessed - this.latestBatchIdReplied != 1) {
this.stats.incOutOfOrderBatchIds();
logger.warn(LocalizedMessage.create(
LocalizedStrings.ServerConnection_BATCH_IDS_ARE_OUT_OF_ORDER_SETTING_LATESTBATCHID_TO_0_IT_WAS_1,
new Object[] {Integer.valueOf(justProcessed),
Integer.valueOf(this.latestBatchIdReplied)}));
}
this.latestBatchIdReplied = justProcessed;
}
public int getLatestBatchIdReplied() {
return this.latestBatchIdReplied;
}
private final Object ownerLock = new Object();
protected void interruptOwner() {
synchronized (this.ownerLock) {
if (this.owner != null) {
this.owner.interrupt();
}
}
}
private void setOwner() {
synchronized (this.ownerLock) {
this.owner = Thread.currentThread();
}
}
private void unsetOwner() {
synchronized (this.ownerLock) {
this.owner = null;
// clear the interrupted bit since our thread is in a thread pool
Thread.interrupted();
}
}
private void initStreams(Socket s, int socketBufferSize, MessageStats msgStats) {
try {
theSocket = s;
theSocket.setSendBufferSize(socketBufferSize);
theSocket.setReceiveBufferSize(socketBufferSize);
if (getAcceptor().isSelector()) {
// set it on the message to null. This causes Message
// to fetch it from a thread local. That way we only need
// one per thread in our selector thread pool instead of
// one per connection.
commBuffer = null;
} else {
commBuffer = allocateCommBuffer(socketBufferSize, s);
}
requestMsg.setComms(this, theSocket, commBuffer, msgStats);
replyMsg.setComms(this, theSocket, commBuffer, msgStats);
responseMsg.setComms(this, theSocket, commBuffer, msgStats);
errorMsg.setComms(this, theSocket, commBuffer, msgStats);
chunkedResponseMsg.setComms(this, theSocket, commBuffer, msgStats);
queryResponseMsg.setComms(this, theSocket, commBuffer, msgStats);
executeFunctionResponseMsg.setComms(this, theSocket, commBuffer, msgStats);
registerInterestResponseMsg.setComms(this, theSocket, commBuffer, msgStats);
keySetResponseMsg.setComms(this, theSocket, commBuffer, msgStats);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
logger.fatal(e.getMessage(), e);
}
}
public boolean isOpen() {
return !isClosed();
}
public boolean isClosed() {
return this.theSocket == null || !this.theSocket.isConnected() || this.theSocket.isClosed();
}
public void cleanup(boolean timedOut) {
if (cleanup() && timedOut) {
this.stats.incConnectionsTimedOut();
}
}
public boolean cleanup() {
if (isClosed()) {
return false;
}
if (this.communicationMode == Acceptor.CLIENT_TO_SERVER || isGatewayConnection()
|| this.communicationMode == Acceptor.MONITOR_TO_SERVER
/* || this.communicationMode == Acceptor.CLIENT_TO_SERVER_FOR_QUEUE */) {
getAcceptor().decClientServerCnxCount();
}
try {
theSocket.close();
} catch (Exception e) {
}
try {
if (this.authzRequest != null) {
this.authzRequest.close();
this.authzRequest = null;
}
} catch (Exception ex) {
if (securityLogWriter.warningEnabled()) {
securityLogWriter.warning(
LocalizedStrings.ServerConnection_0_AN_EXCEPTION_WAS_THROWN_WHILE_CLOSING_CLIENT_AUTHORIZATION_CALLBACK_1,
new Object[] {this.name, ex});
}
}
try {
if (this.postAuthzRequest != null) {
this.postAuthzRequest.close();
this.postAuthzRequest = null;
}
} catch (Exception ex) {
if (securityLogWriter.warningEnabled()) {
securityLogWriter.warning(
LocalizedStrings.ServerConnection_0_AN_EXCEPTION_WAS_THROWN_WHILE_CLOSING_CLIENT_POSTPROCESS_AUTHORIZATION_CALLBACK_1,
new Object[] {this.name, ex});
}
}
getAcceptor().unregisterSC(this);
if (logger.isDebugEnabled()) {
logger.debug("{}: Closed connection", this.name);
}
releaseCommBuffer();
return true;
}
private void releaseCommBuffer() {
ByteBuffer bb = this.commBuffer;
if (bb != null) {
this.commBuffer = null;
ServerConnection.releaseCommBuffer(bb);
}
}
/**
* Just ensure that this class gets loaded.
*
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
// nothing needed, just make sure this class gets loaded.
}
/**
* @see SystemFailure#emergencyClose()
*/
public void emergencyClose() {
this.terminated = true;
Socket s = this.theSocket;
if (s != null) {
try {
s.close();
} catch (IOException e) {
// ignore
}
}
}
@Override
public String toString() {
return this.name;
}
/** returns the name of this connection */
public String getName() {
return this.name;
}
/**
* @return The ClientProxyMembershipID associated with the ServerConnection
*/
public ClientProxyMembershipID getProxyID() {
return this.proxyId;
}
/**
* @return The ClientProxyMembershipID associated with the ServerConnection
*/
public CachedRegionHelper getCachedRegionHelper() {
return this.crHelper;
}
/**
* @return The CacheServerStats associated with the ServerConnection
*/
public CacheServerStats getCacheServerStats() {
return this.stats;
}
/**
* @return The ReplyMessage associated with the ServerConnection
*/
public Message getReplyMessage() {
return this.replyMsg;
}
/**
* @return The ChunkedResponseMessage associated with the ServerConnection
*/
public ChunkedMessage getChunkedResponseMessage() {
return this.chunkedResponseMsg;
}
/**
* @return The ErrorResponseMessage associated with the ServerConnection
*/
public Message getErrorResponseMessage() {
return this.errorMsg;
}
/**
* @return The ResponseMessage associated with the ServerConnection
*/
public Message getResponseMessage() {
return this.responseMsg;
}
/**
* @return The Request Message associated with the ServerConnection
*/
public Message getRequestMessage() {
return this.requestMsg;
}
/**
* @return The QueryResponseMessage associated with the ServerConnection
*/
public ChunkedMessage getQueryResponseMessage() {
return this.queryResponseMsg;
}
public ChunkedMessage getFunctionResponseMessage() {
return this.executeFunctionResponseMsg;
}
public ChunkedMessage getKeySetResponseMessage() {
return this.keySetResponseMsg;
}
public ChunkedMessage getRegisterInterestResponseMessage() {
return this.registerInterestResponseMsg;
}
/*
* The four boolean fields and the String & Object field below are the transient data We have made
* it fields just because we know that they will be operated by a single thread only & hence in
* effect behave as local variables.
*/
private boolean requiresResponse;
private boolean requiresChunkedResponse;
private boolean potentialModification;
private boolean responded;
private Object modKey = null;
private String modRegion = null;
void resetTransientData() {
this.potentialModification = false;
this.requiresResponse = false;
this.responded = false;
this.requiresChunkedResponse = false;
this.modKey = null;
this.modRegion = null;
queryResponseMsg.setNumberOfParts(2);
chunkedResponseMsg.setNumberOfParts(1);
executeFunctionResponseMsg.setNumberOfParts(1);
registerInterestResponseMsg.setNumberOfParts(1);
keySetResponseMsg.setNumberOfParts(1);
}
String getModRegion() {
return this.modRegion;
}
Object getModKey() {
return this.modKey;
}
boolean getPotentialModification() {
return this.potentialModification;
}
public void setModificationInfo(boolean potentialModification, String modRegion, Object modKey) {
this.potentialModification = potentialModification;
this.modRegion = modRegion;
this.modKey = modKey;
}
public void setAsTrue(int boolID) {
switch (boolID) {
case Command.RESPONDED:
this.responded = true;
break;
case Command.REQUIRES_RESPONSE:
this.requiresResponse = true;
break;
case Command.REQUIRES_CHUNKED_RESPONSE:
this.requiresChunkedResponse = true;
break;
default:
throw new IllegalArgumentException(
LocalizedStrings.ServerConnection_THE_ID_PASSED_IS_0_WHICH_DOES_NOT_CORRESPOND_WITH_ANY_TRANSIENT_DATA
.toLocalizedString(Integer.valueOf(boolID)));
}
}
public boolean getTransientFlag(int boolID) {
boolean retVal;
switch (boolID) {
case Command.RESPONDED:
retVal = this.responded;
break;
case Command.REQUIRES_RESPONSE:
retVal = this.requiresResponse;
break;
case Command.REQUIRES_CHUNKED_RESPONSE:
retVal = this.requiresChunkedResponse;
break;
default:
throw new IllegalArgumentException(
LocalizedStrings.ServerConnection_THE_ID_PASSED_IS_0_WHICH_DOES_NOT_CORRESPOND_WITH_ANY_TRANSIENT_DATA
.toLocalizedString(Integer.valueOf(boolID)));
}
return retVal;
}
public void setFlagProcessMessagesAsFalse() {
this.processMessages = false;
}
boolean getFlagProcessMessages() {
return this.processMessages;
}
public InternalLogWriter getLogWriter() {
return this.logWriter; // TODO:LOG:CONVERT: remove getLogWriter after callers are converted
}
// this is for old client before(<6.5), from 6.5 userAuthId comes in user request
private long userAuthId;
// this is for old client before(<6.5), from 6.5 userAuthId comes in user request
public void setUserAuthId(long uniqueId) {
this.userAuthId = uniqueId;
}
private byte[] encryptId(long id, ServerConnection servConn) throws Exception {
// deserialize this using handshake keys
HeapDataOutputStream hdos = null;
try {
hdos = new HeapDataOutputStream(Version.CURRENT);
hdos.writeLong(id);
return ((HandShake) this.handshake).encryptBytes(hdos.toByteArray());
} finally {
hdos.close();
}
}
public long getUniqueId() {
long uniqueId = 0;
if (this.handshake.getVersion().isPre65() || isGatewayConnection()) {
uniqueId = this.userAuthId;
} else if (this.requestMsg.isSecureMode()) {
uniqueId = messageIdExtractor.getUniqueIdFromMessage(this.requestMsg,
(HandShake) this.handshake, this.connectionId);
} else {
throw new AuthenticationRequiredException(
LocalizedStrings.HandShake_NO_SECURITY_CREDENTIALS_ARE_PROVIDED.toLocalizedString());
}
return uniqueId;
}
private boolean isGatewayConnection() {
return getCommunicationMode() == Acceptor.GATEWAY_TO_GATEWAY;
}
public AuthorizeRequest getAuthzRequest() throws AuthenticationRequiredException, IOException {
// look client version and return authzrequest
// for backward client it will be store in member variable userAuthId
// for other look "requestMsg" here and get unique-id from this to get the authzrequest
if (!AcceptorImpl.isAuthenticationRequired())
return null;
if (AcceptorImpl.isIntegratedSecurity())
return null;
long uniqueId = getUniqueId();
UserAuthAttributes uaa = null;
try {
uaa = this.clientUserAuths.getUserAuthAttributes(uniqueId);
} catch (NullPointerException npe) {
if (this.isTerminated()) {
// Bug #52023.
throw new IOException("Server connection is terminated.");
} else {
logger.debug("Unexpected exception {}", npe);
}
}
if (uaa == null) {
throw new AuthenticationRequiredException("User authorization attributes not found.");
}
AuthorizeRequest authReq = uaa.getAuthzRequest();
if (logger.isDebugEnabled()) {
logger.debug("getAuthzRequest() authrequest: {}",
((authReq == null) ? "NULL (only authentication is required)" : "not null"));
}
return authReq;
}
public AuthorizeRequestPP getPostAuthzRequest()
throws AuthenticationRequiredException, IOException {
if (!AcceptorImpl.isAuthenticationRequired())
return null;
if (AcceptorImpl.isIntegratedSecurity())
return null;
// look client version and return authzrequest
// for backward client it will be store in member variable userAuthId
// for other look "requestMsg" here and get unique-id from this to get the authzrequest
long uniqueId = getUniqueId();
UserAuthAttributes uaa = null;
try {
uaa = this.clientUserAuths.getUserAuthAttributes(uniqueId);
} catch (NullPointerException npe) {
if (this.isTerminated()) {
// Bug #52023.
throw new IOException("Server connection is terminated.");
} else {
logger.debug("Unexpected exception {}", npe);
}
}
if (uaa == null) {
throw new AuthenticationRequiredException("User authorization attributes not found.");
}
AuthorizeRequestPP postAuthReq = uaa.getPostAuthzRequest();
return postAuthReq;
}
/** returns the member ID byte array to be used for creating EventID objects */
public byte[] getEventMemberIDByteArray() {
return this.memberIdByteArray;
}
public void setClientDisconnectCleanly() {
this.clientDisconnectedCleanly = true;
}
public void setClientDisconnectedException(Throwable e) {
this.clientDisconnectedException = e;
}
public void setMessageIdExtractor(MessageIdExtractor messageIdExtractor) {
this.messageIdExtractor = messageIdExtractor;
}
public MessageIdExtractor getMessageIdExtractor() {
return this.messageIdExtractor;
}
}