blob: aff98ec211381d9840f77d035367da1308660541 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.stream;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Set;
import java.util.logging.Logger;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.JEVersion;
import com.sleepycat.je.LockTimeoutException;
import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.rep.InsufficientAcksException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepGroupImpl.NodeConflictException;
import com.sleepycat.je.rep.impl.RepNodeImpl;
import com.sleepycat.je.rep.impl.node.Feeder;
import com.sleepycat.je.rep.impl.node.Feeder.ExitException;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.stream.Protocol.JEVersions;
import com.sleepycat.je.rep.stream.Protocol.JEVersionsReject;
import com.sleepycat.je.rep.stream.Protocol.NodeGroupInfo;
import com.sleepycat.je.rep.stream.Protocol.ReplicaJEVersions;
import com.sleepycat.je.rep.stream.Protocol.ReplicaProtocolVersion;
import com.sleepycat.je.rep.stream.Protocol.SNTPRequest;
import com.sleepycat.je.rep.utilint.BinaryProtocol.Message;
import com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
/**
* Implements the Feeder side of the handshake between the Feeder and the
* Replica. The ReplicaFeederHandshake class takes care of the other side.
*
* @see <a href="https://sleepycat.oracle.com/pmwiki/pmwiki.php?n=JEHomePage.ReplicaFeederHandshake">FeederReplicaHandshake</a>
*/
public class FeederReplicaHandshake {
private final int GROUP_RETRY = 60;
private final long GROUP_RETRY_SLEEP_MS = 1000;
/* The rep node (server or replica) */
private final RepNode repNode;
private final NamedChannel namedChannel;
private final NameIdPair feederNameIdPair;
/* Established during the first message. */
private NameIdPair replicaNameIdPair = null;
private ReplicaJEVersions replicaJEVersions;
/**
* The negotiated highest version associated with log records to be sent
* by this feeder in the HA stream.
*/
private int streamLogVersion;
/** The node associated with the replica, or null if not known. */
private volatile RepNodeImpl replicaNode;
private final Logger logger;
/**
* A test hook that is called before a message is written. Note that the
* hook is inherited by the ReplicaFeederHandshake, and will be kept in
* place for the entire handshake.
*/
private final TestHook<Message> writeMessageHook;
/*
* Used during testing: A non-zero value overrides the actual log
* version.
*/
private static int testCurrentLogVersion = 0;
/**
* An instance of this class is created with each new handshake preceding
* the setting up of a connection.
*
* @param repNode the replication node
* @param feeder the feeder instance
* @param namedChannel the channel to be used for the handshake
*/
public FeederReplicaHandshake(RepNode repNode,
Feeder feeder,
NamedChannel namedChannel) {
this.repNode = repNode;
this.namedChannel = namedChannel;
feederNameIdPair = repNode.getNameIdPair();
logger = LoggerUtils.getLogger(getClass());
writeMessageHook = feeder.getWriteMessageHook();
}
/**
* Returns the replica node ID. The returned value is only valid after
* the handshake has been executed.
*
* @return the replica node name id pair
*/
public NameIdPair getReplicaNameIdPair() {
return replicaNameIdPair;
}
/**
* Returns the current log version for the feeder, which is the highest log
* version of any replicable log entry supplied by this feeder. Uses
* LogEntryType.LOG_VERSION_HIGHEST_REPLICABLE, not LOG_VERSION, since some
* log versions may have only applied to non-replicable log entries, as was
* the case for log version 10.
*/
private int getCurrentLogVersion() {
return (testCurrentLogVersion != 0) ?
testCurrentLogVersion :
LogEntryType.LOG_VERSION_HIGHEST_REPLICABLE;
}
/**
* Return the negotiated log version that will be used for the HA stream
* between the feeder and the replica.
*/
public int getStreamLogVersion() {
if (streamLogVersion <= 0) {
throw new IllegalStateException("execute() method was not invoked");
}
return streamLogVersion;
}
public static void setTestLogVersion(int testLogVersion) {
testCurrentLogVersion = testLogVersion;
}
/** Get the current JE version, supporting a test override. */
private JEVersion getCurrentJEVersion() {
return repNode.getRepImpl().getCurrentJEVersion();
}
/** Get the current protocol version, supporting a test override. */
private int getCurrentProtocolVersion() {
return Protocol.getJEVersionProtocolVersion(getCurrentJEVersion());
}
/**
* Determines log compatibility. Returns null if they are compatible or the
* server would like to defer the rejection to the replica. Return a
* JEVersionsReject message if the server does not wish to communicate with
* the replica.
*
* This check requires the log version of the replicas to be greater than
* or equal to {@value LogEntryType#LOG_VERSION_REPLICATE_OLDER} . Allowing
* the replica version to be behind the feeder version supports replication
* during upgrades where the feeder is upgraded before the replica.
* [#22336]
*
* This check also requires that the JE version of the replica is at least
* the minJEVersion specified by the RepGroupImpl, if any. This check
* makes sure that nodes running an older software version cannot join the
* group after a new and incompatible feature has been used.
*/
private JEVersionsReject checkJECompatibility(final Protocol protocol,
final JEVersions jeVersions) {
final int replicaLogVersion = jeVersions.getLogVersion();
if (replicaLogVersion < LogEntryType.LOG_VERSION_REPLICATE_OLDER - 1) {
return protocol.new JEVersionsReject(
"Incompatible log versions. " +
"Feeder log version: " + getCurrentLogVersion() +
", Feeder JE version: " + getCurrentJEVersion() +
", Replica log version: " + replicaLogVersion +
", Replica JE version: " + jeVersions.getVersion());
}
final JEVersion minJEVersion = repNode.getGroup().getMinJEVersion();
if (minJEVersion.compareTo(jeVersions.getVersion()) > 0) {
return protocol.new JEVersionsReject(
"Unsupported JE version. " +
"Feeder JE version: " + getCurrentJEVersion() +
", Feeder min JE version: " + minJEVersion +
", Replica JE version: " + jeVersions.getVersion());
}
return null;
}
/**
* Returns the JE version supported by the replica, or {@code null} if the
* value is not yet known. This method should only be called after the
* {@link #execute} method has returned successfully.
*
* @return the replica's JE version or {@code null}
*/
public JEVersion getReplicaJEVersion() {
return (replicaJEVersions != null) ?
replicaJEVersions.getVersion() :
null;
}
/**
* Returns a RepNodeImpl that represents the replica for a successful
* handshake. This method should only be called after the {@link #execute}
* method has returned successfully, and will throw IllegalStateException
* otherwise.
*
* @return the replica node
* @throws IllegalStateException if the handshake did not complete
*/
public RepNodeImpl getReplicaNode() {
if (replicaNode == null) {
throw new IllegalStateException("Handshake did not complete");
}
return replicaNode;
}
/**
* Executes the feeder side of the handshake.
* @throws ProtocolException
* @throws ExitException
*/
public Protocol execute()
throws DatabaseException,
IOException,
ProtocolException,
ExitException {
LoggerUtils.info(logger, repNode.getRepImpl(),
"Feeder-replica handshake start");
/* First negotiate a compatible protocol */
Protocol protocol = negotiateProtocol();
/* Now exchange JE version information using the negotiated protocol */
replicaJEVersions = (ReplicaJEVersions) protocol.read(namedChannel);
final String versionMsg =
" Replica " + replicaNameIdPair.getName() +
" Versions" +
" JE: " + replicaJEVersions.getVersion().getVersionString() +
" Log: " + replicaJEVersions.getLogVersion() +
" Protocol: " + protocol.getVersion();
LoggerUtils.fine(logger, repNode.getRepImpl(), versionMsg);
JEVersionsReject reject =
checkJECompatibility(protocol, replicaJEVersions);
if (reject != null) {
final String msg = "Version incompatibility: " +
reject.getErrorMessage() +
" with replica " + replicaNameIdPair.getName();
LoggerUtils.warning(logger, repNode.getRepImpl(), msg);
writeMessage(protocol, reject);
throw new ExitException(msg);
}
/*
* Use the minimum common log version. This ensures that the feeder
* can generate it and that the replica can understand it.
*/
streamLogVersion =
Math.min(getCurrentLogVersion(), replicaJEVersions.getLogVersion());
writeMessage(protocol,
protocol.new FeederJEVersions(
getCurrentJEVersion(),
streamLogVersion,
repNode.getMinJEVersion()));
/* Ensure that the feeder sends the agreed upon version. */
protocol.setStreamLogVersion(streamLogVersion);
/* Verify replica membership info */
verifyMembershipInfo(protocol);
checkClockSkew(protocol);
LoggerUtils.info
(logger, repNode.getRepImpl(),
"Feeder-replica " + replicaNameIdPair.getName() +
" handshake completed." +
versionMsg +
" Stream Log: " + protocol.getStreamLogVersion());
return protocol;
}
/** Write a protocol message to the channel. */
private void writeMessage(final Protocol protocol,
final Message message)
throws IOException {
assert TestHookExecute.doHookIfSet(writeMessageHook, message);
protocol.write(message, namedChannel);
}
/**
* Responds to message exchanges used to establish clock skew.
* @throws ProtocolException
*/
private void checkClockSkew(Protocol protocol)
throws IOException,
ProtocolException {
SNTPRequest request;
do {
request = protocol.read(namedChannel.getChannel(),
SNTPRequest.class);
writeMessage(protocol, protocol.new SNTPResponse(request));
} while (!request.isLast());
}
/**
* Verifies that the group as configured here at the Feeder matches the
* configuration of the replica.
*
* @param protocol the protocol to use for this verification
*
* @throws IOException
* @throws DatabaseException
*/
private void verifyMembershipInfo(Protocol protocol)
throws IOException,
DatabaseException,
ExitException {
NodeGroupInfo nodeGroup =
(Protocol.NodeGroupInfo)(protocol.read(namedChannel));
final RepGroupImpl group = repNode.getGroup();
RepNodeImpl node = group.getNode(nodeGroup.getNodeName());
try {
if (nodeGroup.getNodeId() != replicaNameIdPair.getId()) {
throw new ExitException
("The replica node ID sent during protocol negotiation: " +
replicaNameIdPair +
" differs from the one sent in the MembershipInfo " +
"request: " + nodeGroup.getNodeId());
}
if (nodeGroup.getNodeType().hasTransientId()) {
/*
* Note the secondary or external node if this is a new node.
* Otherwise, fall through, and the subsequent code will
* notice the incompatible node type.
*/
if (node == null) {
/* A new node with transient id */
node = new RepNodeImpl(nodeGroup);
try {
repNode.addTransientIdNode(node);
} catch (IllegalStateException | NodeConflictException e) {
throw new ExitException(e, true);
}
}
} else if (node == null || !node.isQuorumAck()) {
/* Not currently a confirmed member. */
if (nodeGroup.getNodeType().isArbiter()) {
Set<RepNodeImpl> arbMembers = group.getArbiterMembers();
if (arbMembers.size() > 0) {
throw new ExitException(
"An Arbiter node already exists in the "+
"replication group.");
}
}
try {
/*
* It is possible that the thread that added the
* node to the group database has not added the
* member to the in memory representation. We retry
* several times to give the other thread a chance to
* update the in memory structure.
*/
repNode.getRepGroupDB().ensureMember(nodeGroup);
for (int i=0; i < GROUP_RETRY; i++) {
node =
repNode.getGroup().getMember(
nodeGroup.getNodeName());
if (node != null) {
break;
}
try {
Thread.sleep(GROUP_RETRY_SLEEP_MS);
} catch (Exception e) {
}
}
if (node == null) {
throw EnvironmentFailureException.unexpectedState
("Node: " + nodeGroup.getNameIdPair() +
" not found");
}
} catch (InsufficientReplicasException |
InsufficientAcksException |
LockTimeoutException e) {
throw new ExitException(e, false);
} catch (NodeConflictException e) {
throw new ExitException(e, true);
}
} else if (node.isRemoved()) {
throw new ExitException
("Node: " + nodeGroup.getNameIdPair() +
" is no longer a member of the group." +
" It was explicitly removed.");
}
doGroupChecks(nodeGroup, group);
doNodeChecks(nodeGroup, node);
maybeUpdateJEVersion(nodeGroup, group, node);
} catch (ExitException exception) {
LoggerUtils.info
(logger, repNode.getRepImpl(), exception.getMessage());
if (exception.failReplica()) {
/*
* Explicit message to force replica to invalidate the
* environment.
*/
writeMessage(protocol,
protocol.new NodeGroupInfoReject(
exception.getMessage()));
}
throw exception;
}
/* Id is now established for sure, update the pair. */
replicaNameIdPair.update(node.getNameIdPair());
namedChannel.setNameIdPair(replicaNameIdPair);
LoggerUtils.fine(logger, repNode.getRepImpl(),
"Channel Mapping: " + replicaNameIdPair + " is at " +
namedChannel.getChannel());
writeMessage(protocol,
protocol.new NodeGroupInfoOK(
group.getUUID(), replicaNameIdPair));
}
/**
* Verifies that the group related information is consistent.
*
* @throws ExitException if the configuration in the group db differs
* from the supplied config
*/
private void doGroupChecks(NodeGroupInfo nodeGroup,
final RepGroupImpl group)
throws ExitException {
if (nodeGroup.isDesignatedPrimary() &&
repNode.getRepImpl().isDesignatedPrimary()) {
throw new ExitException
("Conflicting Primary designations. Feeder node: " +
repNode.getNodeName() +
" and replica node: " + nodeGroup.getNodeName() +
" cannot simultaneously be designated primaries");
}
if (!nodeGroup.getGroupName().equals(group.getName())) {
throw new ExitException
("The feeder belongs to the group: " +
group.getName() + " but replica id" + replicaNameIdPair +
" belongs to the group: " + nodeGroup.getGroupName());
}
if (!RepGroupImpl.isUnknownUUID(nodeGroup.getUUID()) &&
!nodeGroup.getUUID().equals(group.getUUID())) {
throw new ExitException
("The environments have the same name: " +
group.getName() +
" but represent different environment instances." +
" The environment at the master has UUID " +
group.getUUID() +
", while the replica " + replicaNameIdPair.getName() +
" has UUID: " + nodeGroup.getUUID());
}
}
/**
* Verifies that the old and new node configurations are the same.
*
* @throws ExitException if the configuration in the group db differs
* from the supplied config
*/
private void doNodeChecks(NodeGroupInfo nodeGroup,
RepNodeImpl node)
throws ExitException {
if (!nodeGroup.getHostName().equals(node.getHostName())) {
throw new ExitException
("Conflicting hostnames for replica id: " +
replicaNameIdPair +
" Feeder thinks it is: " + node.getHostName() +
" Replica is configured to use: " +
nodeGroup.getHostName());
}
if (nodeGroup.port() != node.getPort()) {
throw new ExitException
("Conflicting ports for replica id: " + replicaNameIdPair +
" Feeder thinks it uses: " + node.getPort() +
" Replica is configured to use: " + nodeGroup.port());
}
if (!((NodeType.ELECTABLE == node.getType()) ||
(NodeType.SECONDARY == node.getType()) ||
(NodeType.EXTERNAL == node.getType()) ||
(NodeType.ARBITER == node.getType()) ||
(NodeType.MONITOR == node.getType()))) {
throw new ExitException
("The replica node: " + replicaNameIdPair +
" is of type: " + node.getType());
}
if (!nodeGroup.getNodeType().equals(node.getType())) {
throw new ExitException
("Conflicting node types for: " + replicaNameIdPair +
" Feeder thinks it uses: " + node.getType() +
" Replica is configured as type: " + nodeGroup.getNodeType());
}
replicaNode = node;
}
/**
* Update the node's JE version from the provided group info if storing the
* JE version is supported and the current version differs from the stored
* one. It's OK if the attempt does not fully succeed due to a lack of
* replicas or acknowledgments: we can try again the next time.
*
* @throws ExitException if the attempt fails because the updated node's
* socket address conflicts with another node
*/
private void maybeUpdateJEVersion(final NodeGroupInfo nodeGroup,
final RepGroupImpl group,
final RepNodeImpl node)
throws ExitException {
if ((group.getFormatVersion() >= RepGroupImpl.FORMAT_VERSION_3) &&
(nodeGroup.getJEVersion() != null) &&
!nodeGroup.getJEVersion().equals(node.getJEVersion())) {
/*
* Try updating the JE version information, given that the group
* format supports this. Don't require a quorum of acknowledgments
* since the fact that the handshake for this replica is underway
* may mean that a quorum is not available. Saving the JE version
* is only an optimization, so it is OK if this attempt fails to be
* persistent.
*/
try {
repNode.getRepGroupDB().updateMember(
new RepNodeImpl(nodeGroup), false);
} catch (InsufficientReplicasException |
InsufficientAcksException |
LockTimeoutException e) {
/* Ignored */
} catch (NodeConflictException e) {
throw new ExitException(e, true);
}
}
}
/**
* Negotiates and returns the protocol that will be used in all subsequent
* interactions with the Replica, if the replica accepts to it.
*
* @return the protocol instance to be used for subsequent interactions
*
* @throws IOException
* @throws ExitException
*/
private Protocol negotiateProtocol()
throws IOException, ExitException {
Protocol defaultProtocol =
Protocol.getProtocol(repNode, getCurrentProtocolVersion());
/*
* Wait to receive the replica's version, decide which protocol version
* to use ourselves, and then reply with our version.
*/
ReplicaProtocolVersion message =
(ReplicaProtocolVersion) defaultProtocol.read(namedChannel);
replicaNameIdPair = message.getNameIdPair();
Feeder dup =
repNode.feederManager().getFeeder(replicaNameIdPair.getName());
/*
* SR26298, SR26366 HANDSHAKE_ERROR after network partition. The
* problem also partially discussed in SR21650. In a rare network
* partition scenario, when the socket channel is inactive and issued
* close after timeout, the feeder's threads may still in a waiting
* stage for network IO. Either the closure is failed or the close did
* not make effect to channel IO, this may be caused by system
* resource limitation. In this case, the non-exited feeder threads
* with a closed socket channel will prevent new feeder to be created
* for the same replica. To ease the problem, if we detect a
* duplicate feeder, we will check that whether the existing feeder's
* channel is still open, if it is already closed, then we will
* issue a shutdown to the feeder explicitly.
*/
if (dup != null && dup.getChannel() != null &&
!dup.getChannel().isOpen() && !dup.isShutdown()) {
dup.shutdown(new IOException("Feeder's channel for node " +
replicaNameIdPair +
" is already closed"));
}
dup = repNode.feederManager().getFeeder(replicaNameIdPair.getName());
if ((dup != null) ||
(message.getNameIdPair().getName().
equals(feederNameIdPair.getName()))) {
/* Reject the connection. */
writeMessage(defaultProtocol,
defaultProtocol.new DuplicateNodeReject(
"This node: " + replicaNameIdPair +
" is already in active use at the feeder "));
SocketAddress dupAddress =
namedChannel.getChannel().getRemoteAddress();
throw new ExitException
("A replica with the id: " + replicaNameIdPair +
" is already active with this feeder. " +
" The duplicate replica resides at: " +
dupAddress);
}
/*
* If the Replica's version is acceptable, use it, otherwise return the
* default protocol at this node, in case the Replica can support it.
*/
final int replicaVersion = message.getVersion();
Protocol protocol =
Protocol.get(repNode, replicaVersion, getCurrentProtocolVersion());
if (protocol == null) {
protocol = defaultProtocol;
}
defaultProtocol.write
(defaultProtocol.new FeederProtocolVersion(protocol.getVersion()),
namedChannel);
return protocol;
}
}