blob: 1e97d821acb125e3490e05def69dddecb1273ad7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.client.internal;
import java.net.SocketTimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.ChunkedMessage;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Represents an operation that can be performed in a client by sending a message to a server.
*
* @since GemFire 5.7
*/
public abstract class AbstractOp implements Op {
private static final Logger logger = LogService.getLogger();
private final Message msg;
private boolean allowDuplicateMetadataRefresh;
protected AbstractOp(int msgType, int msgParts) {
msg = new Message(msgParts, Version.CURRENT);
getMessage().setMessageType(msgType);
}
/**
* Returns the message that this op will send to the server
*/
protected Message getMessage() {
return msg;
}
protected void initMessagePart() {
}
/**
* Sets the transaction id on the message
*/
private void setMsgTransactionId() {
if (participateInTransaction() && getMessage().getTransactionId() == TXManagerImpl.NOTX) {
getMessage().setTransactionId(TXManagerImpl.getCurrentTXUniqueId());
}
}
/**
* Attempts to send this operation's message out on the given connection
*
* @param cnx the connection to use when sending
* @throws Exception if the send fails
*/
protected void attemptSend(Connection cnx) throws Exception {
setMsgTransactionId();
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER_VERBOSE, "Sending op={} using {}",
getShortClassName(), cnx);
}
getMessage().setComms(cnx.getSocket(), cnx.getInputStream(), cnx.getOutputStream(),
cnx.getCommBuffer(), cnx.getStats());
try {
sendMessage(cnx);
} finally {
getMessage().unsetComms();
}
}
/** returns the class name w/o package information. useful in logging */
public String getShortClassName() {
String cname = getClass().getName();
return cname.substring(getClass().getPackage().getName().length() + 1);
}
/**
* New implementations of AbstractOp should override this method if the implementation should be
* excluded from client authentication. e.g. PingOp#sendMessage(Connection cnx)
*
* @see AbstractOp#needsUserId()
* @see AbstractOp#processSecureBytes(Connection, Message)
* @see ServerConnection#updateAndGetSecurityPart()
*/
protected void sendMessage(Connection cnx) throws Exception {
if (cnx.getServer().getRequiresCredentials()) {
// Security is enabled on client as well as on server
getMessage().setMessageHasSecurePartFlag();
long userId;
if (UserAttributes.userAttributes.get() == null) { // single user mode
userId = cnx.getServer().getUserId();
} else { // multi user mode
Long id = UserAttributes.userAttributes.get().getServerToId().get(cnx.getServer());
if (id == null) {
// This will ensure that this op is retried on another server, unless
// the retryCount is exhausted. Fix for Bug 41501
throw new ServerConnectivityException("Connection error while authenticating user");
}
userId = id;
}
try (HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT)) {
hdos.writeLong(cnx.getConnectionID());
hdos.writeLong(userId);
getMessage().setSecurePart(((ConnectionImpl) cnx).encryptBytes(hdos.toByteArray()));
}
}
getMessage().send(false);
}
/**
* Process the security information in a response from the server. If the server sends a security
* "part" we must process it so all subclasses should allow this method to be invoked.
*
* @see ServerConnection#updateAndGetSecurityPart()
*/
protected void processSecureBytes(Connection cnx, Message message) throws Exception {
if (cnx.getServer().getRequiresCredentials()) {
if (!message.isSecureMode()) {
// This can be seen during shutdown
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE,
"Response message from {} for {} has no secure part.", cnx, this);
}
return;
}
byte[] partBytes = message.getSecureBytes();
if (partBytes == null) {
if (logger.isDebugEnabled()) {
logger.debug("Response message for {} has no bytes in secure part.", this);
}
return;
}
byte[] bytes = ((ConnectionImpl) cnx).decryptBytes(partBytes);
ByteArrayDataInput dis = new ByteArrayDataInput(bytes);
cnx.setConnectionID(dis.readLong());
}
}
/**
* New implementations of AbstractOp should override this method to return false if the
* implementation should be excluded from client authentication. e.g. PingOp#needsUserId()
* <P/>
* Also, such an operation's <code>MessageType</code> must be added in the 'if' condition in
* {@link ServerConnection#updateAndGetSecurityPart()}
*
* @see AbstractOp#sendMessage(Connection)
* @see ServerConnection#updateAndGetSecurityPart()
*/
protected boolean needsUserId() {
return true;
}
/**
* Attempts to read a response to this operation by reading it from the given connection, and
* returning it.
*
* @param cnx the connection to read the response from
* @return the result of the operation or <code>null</code> if the operation has no result.
* @throws Exception if the execute failed
*/
protected Object attemptReadResponse(Connection cnx) throws Exception {
Message msg = createResponseMessage();
if (msg != null) {
msg.setComms(cnx.getSocket(), cnx.getInputStream(), cnx.getOutputStream(),
cnx.getCommBuffer(), cnx.getStats());
if (msg instanceof ChunkedMessage) {
try {
return processResponse(msg, cnx);
} finally {
msg.unsetComms();
processSecureBytes(cnx, msg);
}
} else {
try {
msg.receive();
} finally {
msg.unsetComms();
processSecureBytes(cnx, msg);
}
return processResponse(msg, cnx);
}
} else {
return null;
}
}
/**
* By default just create a normal one part msg. Subclasses can override this.
*/
protected Message createResponseMessage() {
return new Message(1, Version.CURRENT);
}
protected Object processResponse(Message m, Connection con) throws Exception {
return processResponse(m);
}
/**
* Processes the given response message returning the result, if any, of the processing.
*
* @return the result of processing the response; null if no result
* @throws Exception if response could not be processed or we received a response with a server
* exception.
*/
protected abstract Object processResponse(Message msg) throws Exception;
/**
* Return true of <code>messageType</code> indicates the operation had an error on the server.
*/
protected abstract boolean isErrorResponse(int msgType);
/**
* Process a response that contains an ack.
*
* @param msg the message containing the response
* @param opName text describing this op
* @throws Exception if response could not be processed or we received a response with a server
* exception.
*/
protected void processAck(Message msg, String opName) throws Exception {
final int msgType = msg.getMessageType();
if (msgType != MessageType.REPLY) {
Part part = msg.getPart(0);
if (msgType == MessageType.EXCEPTION) {
String s = ": While performing a remote " + opName;
Throwable t = (Throwable) part.getObject();
if (t instanceof PutAllPartialResultException) {
throw (PutAllPartialResultException) t;
} else {
throw new ServerOperationException(s, t);
}
// Get the exception toString part.
// This was added for c++ thin client and not used in java
} else if (isErrorResponse(msgType)) {
throw new ServerOperationException(part.getString());
} else {
throw new InternalGemFireError("Unexpected message type " + MessageType.getString(msgType));
}
}
}
/**
* Process a response that contains a single Object result.
*
* @param msg the message containing the response
* @param opName text describing this op
* @return the result of the response
* @throws Exception if response could not be processed or we received a response with a server
* exception.
*/
protected Object processObjResponse(Message msg, String opName) throws Exception {
Part part = msg.getPart(0);
final int msgType = msg.getMessageType();
if (msgType == MessageType.RESPONSE) {
return part.getObject();
} else {
if (msgType == MessageType.EXCEPTION) {
String s = "While performing a remote " + opName;
throw new ServerOperationException(s, (Throwable) part.getObject());
// Get the exception toString part.
// This was added for c++ thin client and not used in java
} else if (isErrorResponse(msgType)) {
throw new ServerOperationException(part.getString());
} else {
throw new InternalGemFireError("Unexpected message type " + MessageType.getString(msgType));
}
}
}
boolean isAllowDuplicateMetadataRefresh() {
return allowDuplicateMetadataRefresh;
}
void setAllowDuplicateMetadataRefresh(final boolean allowDuplicateMetadataRefresh) {
this.allowDuplicateMetadataRefresh = allowDuplicateMetadataRefresh;
}
/**
* Used by subclasses who get chunked responses.
*/
public interface ChunkHandler {
/**
* This method will be called once for every incoming chunk
*
* @param msg the current chunk to handle
*/
void handle(ChunkedMessage msg) throws Exception;
}
/**
* Process a chunked response that contains a single Object result.
*
* @param msg the message containing the response
* @param opName text describing this op
* @param callback used to handle each chunks data
* @throws Exception if response could not be processed or we received a response with a server
* exception.
*/
void processChunkedResponse(ChunkedMessage msg, String opName, ChunkHandler callback)
throws Exception {
msg.readHeader();
final int msgType = msg.getMessageType();
if (msgType == MessageType.RESPONSE) {
do {
msg.receiveChunk();
callback.handle(msg);
} while (!msg.isLastChunk());
} else {
if (msgType == MessageType.EXCEPTION) {
msg.receiveChunk();
Part part = msg.getPart(0);
String s = "While performing a remote " + opName;
throw new ServerOperationException(s, (Throwable) part.getObject());
// Get the exception toString part.
// This was added for c++ thin client and not used in java
} else if (isErrorResponse(msgType)) {
msg.receiveChunk();
Part part = msg.getPart(0);
throw new ServerOperationException(part.getString());
} else {
throw new InternalGemFireError("Unexpected message type " + MessageType.getString(msgType));
}
}
}
/**
* Set to true if this attempt failed
*/
protected boolean failed;
/**
* Set to true if this attempt timed out
*/
protected boolean timedOut;
/*
* (non-Javadoc)
*
* @see org.apache.geode.cache.client.internal.Op#attempt(org.apache.geode.cache.client.internal.
* Connection)
*/
@Override
public Object attempt(Connection connection) throws Exception {
failed = true;
timedOut = false;
long start = startAttempt(connection.getStats());
try {
try {
attemptSend(connection);
failed = false;
} finally {
endSendAttempt(connection.getStats(), start);
}
failed = true;
try {
Object result = attemptReadResponse(connection);
failed = false;
return result;
} catch (SocketTimeoutException ste) {
failed = false;
timedOut = true;
throw ste;
}
} finally {
endAttempt(connection.getStats(), start);
}
}
protected boolean hasFailed() {
return failed;
}
protected boolean hasTimedOut() {
return timedOut;
}
protected abstract long startAttempt(ConnectionStats stats);
protected abstract void endSendAttempt(ConnectionStats stats, long start);
protected abstract void endAttempt(ConnectionStats stats, long start);
/**
* Subclasses for AbstractOp should override this method to return false in this message should
* not participate in any existing transaction
*
* @return true if the message should participate in transaction
*/
protected boolean participateInTransaction() {
return true;
}
public boolean isGatewaySenderOp() {
return false;
}
}