blob: 8536e39bb7a720f48227c5e777b05050a243dc47 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.stream;
import static com.sleepycat.je.log.LogEntryType.LOG_VERSION_EXPIRE_INFO;
import static com.sleepycat.je.rep.impl.RepParams.GROUP_NAME;
import static com.sleepycat.je.rep.impl.RepParams.MAX_CLOCK_DELTA;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.JEVersion;
import com.sleepycat.je.dbi.DbConfigManager;
import com.sleepycat.je.dbi.EnvironmentFailureReason;
import com.sleepycat.je.log.LogEntryType;
import com.sleepycat.je.rep.NodeType;
import com.sleepycat.je.rep.impl.RepGroupImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.stream.Protocol.DuplicateNodeReject;
import com.sleepycat.je.rep.stream.Protocol.FeederJEVersions;
import com.sleepycat.je.rep.stream.Protocol.FeederProtocolVersion;
import com.sleepycat.je.rep.stream.Protocol.JEVersionsReject;
import com.sleepycat.je.rep.stream.Protocol.NodeGroupInfoOK;
import com.sleepycat.je.rep.stream.Protocol.NodeGroupInfoReject;
import com.sleepycat.je.rep.stream.Protocol.SNTPResponse;
import com.sleepycat.je.rep.utilint.BinaryProtocol.Message;
import com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.utilint.RepUtils.Clock;
import com.sleepycat.je.utilint.LoggerUtils;
/**
* Implements the Replica side of the handshake protocol between the Replica
* and the Feeder. The FeederReplicaHandshake class takes care of the other
* side.
*
* @see <a href="https://sleepycat.oracle.com/pmwiki/pmwiki.php?n=JEHomePage.ReplicaFeederHandshake">FeederReplicaHandshake</a>
*/
public class ReplicaFeederHandshake {
/* The rep node (server or replica) */
private final RepImpl repImpl;
private final Clock clock;
private final int groupFormatVersion;
private final NamedChannel namedChannel;
private final NameIdPair replicaNameIdPair;
private final NodeType nodeType;
private NameIdPair feederNameIdPair;
private final RepGroupImpl repGroup;
private Protocol protocol = null;
/* The JE software versions in use by the Feeder */
private FeederJEVersions feederJEVersions;
/*
* The time to wait between retries to establish node info in the master.
*/
static final int MEMBERSHIP_RETRY_SLEEP_MS = 60 * 1000;
static final int MEMBERSHIP_RETRIES = 0;
/*
* Used during testing: A non-zero value overrides the actual log version.
*/
private static volatile int testCurrentLogVersion = 0;
/**
* Used during testing: A non-zero value overrides the actual protocol
* version.
*/
private static volatile int testCurrentProtocolVersion = 0;
/* Fields used to track clock skew wrt the feeder. */
private long clockDelay = Long.MAX_VALUE;
private long clockDelta = Long.MAX_VALUE;
private static int CLOCK_SKEW_MAX_SAMPLE_SIZE = 5;
private static final long CLOCK_SKEW_MIN_DELAY_MS = 2;
private final int maxClockDelta;
/**
* If the nodeType is an Arbiter, the SNTPRequest message
* is sent but the clock skew is not checked.
*/
private final boolean checkClockSkew;
private final Logger logger;
/**
* An instance of this class is created with each new handshake preceding
* the setting up of a connection.
*
* @param conf handshake configuration with feeder
*/
public ReplicaFeederHandshake(ReplicaFeederHandshakeConfig conf) {
this.repImpl = conf.getRepImpl();
this.namedChannel = conf.getNamedChannel();
this.repGroup = conf.getGroup();
this.groupFormatVersion = repGroup.getFormatVersion();
this.nodeType = conf.getNodeType();
replicaNameIdPair = conf.getNameIdPair();
this.clock = conf.getClock();
if (nodeType.isArbiter()) {
maxClockDelta = Integer.MAX_VALUE;
checkClockSkew = false;
} else {
maxClockDelta =
repImpl.getConfigManager().getDuration(MAX_CLOCK_DELTA);
checkClockSkew = true;
}
logger = LoggerUtils.getLogger(getClass());
}
/** Get the current log version, supporting a test override. */
private static int getCurrentLogVersion() {
return (testCurrentLogVersion != 0) ?
testCurrentLogVersion :
LogEntryType.LOG_VERSION;
}
/**
* Set the current log version to a different value, for testing.
* Specifying {@code 0} reverts to the standard value.
*
* @param testLogVersion the testing log version or {@code 0}
*/
public static void setTestLogVersion(int testLogVersion) {
testCurrentLogVersion = testLogVersion;
}
/** Get the current JE version, supporting a test override. */
private JEVersion getCurrentJEVersion() {
return repImpl.getCurrentJEVersion();
}
/** Get the current protocol version, supporting a test override. */
private int getCurrentProtocolVersion() {
if (testCurrentProtocolVersion != 0) {
return testCurrentProtocolVersion;
}
return Protocol.getJEVersionProtocolVersion(getCurrentJEVersion());
}
/**
* Set the current protocol version to a different value, for testing.
* Specifying {@code 0} reverts to the standard value.
*/
public static void setTestProtocolVersion(final int testProtocolVersion) {
testCurrentProtocolVersion = testProtocolVersion;
}
/**
* Returns the minJEVersion of the group, or null if unknown (in
* protocol versions &lt; VERSION_7).
*/
public JEVersion getFeederMinJEVersion() {
return feederJEVersions.getMinJEVersion();
}
/**
* Negotiates a protocol that both the replica and feeder can support.
*
* @return the common protocol
*
* @throws IOException
*/
private Protocol negotiateProtocol()
throws IOException {
final Protocol defaultProtocol =
Protocol.getProtocol(repImpl, replicaNameIdPair, clock,
getCurrentProtocolVersion(),
groupFormatVersion);
/* Send over the latest version protocol this replica can support. */
defaultProtocol.write(defaultProtocol.new ReplicaProtocolVersion(),
namedChannel);
/*
* Returns the highest level the feeder can support, or the version we
* just sent, if it can support that version
*/
Message message = defaultProtocol.read(namedChannel);
if (message instanceof DuplicateNodeReject) {
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
"A replica with the name: " + replicaNameIdPair +
" is already active with the Feeder:" + feederNameIdPair);
}
FeederProtocolVersion feederVersion =
((FeederProtocolVersion) message);
feederNameIdPair = feederVersion.getNameIdPair();
Protocol configuredProtocol =
Protocol.get(repImpl, replicaNameIdPair,
clock, feederVersion.getVersion(),
getCurrentProtocolVersion(), groupFormatVersion);
LoggerUtils.fine(logger, repImpl,
"Feeder id: " + feederVersion.getNameIdPair() +
"Response message: " + feederVersion.getVersion());
namedChannel.setNameIdPair(feederNameIdPair);
LoggerUtils.fine(logger, repImpl,
"Channel Mapping: " + feederNameIdPair + " is at " +
namedChannel.getChannel());
if (configuredProtocol == null) {
/* Include JE version information [#22541] */
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.PROTOCOL_VERSION_MISMATCH,
"Incompatible protocol versions. " +
"Protocol version: " + feederVersion.getVersion() +
" introduced in JE version: " +
Protocol.getProtocolJEVersion(feederVersion.getVersion()) +
" requested by the Feeder: " + feederNameIdPair +
" is not supported by this Replica: " + replicaNameIdPair +
" with protocol version: " + defaultProtocol.getVersion() +
" introduced in JE version: " +
Protocol.getProtocolJEVersion(defaultProtocol.getVersion()));
}
return configuredProtocol;
}
/**
* Executes the replica side of the handshake.
* @throws ProtocolException
*/
public Protocol execute()
throws IOException,
ProtocolException {
LoggerUtils.info(logger, repImpl,
"Replica-feeder handshake start");
/* First negotiate the protocol, then use it. */
protocol = negotiateProtocol();
/* Ensure that software versions are compatible. */
verifyVersions();
/**
* Note whether log entries with later log versions need to be
* converted to log version 12 to work around [#25222].
*/
if (feederJEVersions.getLogVersion() == LOG_VERSION_EXPIRE_INFO) {
protocol.setFixLogVersion12Entries(true);
}
/*
* Now perform the membership information validation part of the
* handshake
*/
verifyMembership();
checkClockSkew();
LoggerUtils.info(logger, repImpl,
"Replica-feeder " + feederNameIdPair.getName() +
" handshake completed.");
return protocol;
}
/**
* Checks software and log version compatibility.
*/
private void verifyVersions()
throws IOException {
protocol.write(protocol.new
ReplicaJEVersions(getCurrentJEVersion(),
getCurrentLogVersion()),
namedChannel);
Message message = protocol.read(namedChannel);
if (message instanceof JEVersionsReject) {
/* The software version is not compatible with the Feeder. */
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
" Feeder: " + feederNameIdPair + ". " +
((JEVersionsReject) message).getErrorMessage());
}
/*
* Save the version information in case we want to use it as the basis
* for further compatibility checking in future.
*/
feederJEVersions = (FeederJEVersions) message;
if (feederJEVersions.getLogVersion() > getCurrentLogVersion()) {
throw new EnvironmentFailureException(
repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
" Feeder: " + feederNameIdPair + ". " +
"Feeder log version " + feederJEVersions.getLogVersion() +
" is not known to the replica, whose current log version is " +
getCurrentLogVersion());
}
}
/**
* Exchange membership information messages.
*/
private void verifyMembership()
throws IOException {
DbConfigManager configManager = repImpl.getConfigManager();
String groupName = configManager.get(GROUP_NAME);
Message message = protocol.new
NodeGroupInfo(groupName,
repGroup.getUUID(),
replicaNameIdPair,
repImpl.getHostName(),
repImpl.getPort(),
nodeType,
repImpl.isDesignatedPrimary(),
getCurrentJEVersion());
protocol.write(message, namedChannel);
message = protocol.read(namedChannel);
if (message instanceof NodeGroupInfoReject) {
NodeGroupInfoReject reject = (NodeGroupInfoReject) message;
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
" Feeder: " + feederNameIdPair + ". " +
reject.getErrorMessage());
}
if (!(message instanceof NodeGroupInfoOK)) {
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
" Feeder: " + feederNameIdPair + ". " +
"Protocol error. Unexpected response " + message);
}
final NodeGroupInfoOK nodeGroupInfoOK = (NodeGroupInfoOK) message;
if (repGroup.hasUnknownUUID()) {
/* Correct the initial UUID */
repGroup.setUUID(nodeGroupInfoOK.getUUID());
}
if (nodeType.hasTransientId()) {
/* Update the transient node's ID */
replicaNameIdPair.update(nodeGroupInfoOK.getNameIdPair());
}
}
/**
* Checks for clock skew wrt the current feeder. It's important that the
* clock skew be within an acceptable range so that replica can meet any
* time based consistency requirements requested by transactions. The
* intent of this check is to draw the attention of the application or the
* administrators to the skew, not correct it.
* <p>
* The scheme implemented below is a variation on the scheme used by <a
* href="http://tools.ietf.org/html/rfc2030">SNTP</a> protocol. The Feeder
* plays the role of the SNTP server and the replica the role of the client
* in this situation. The mechanism used here is rough and does not
* guarantee the detection of a clock skew, especially since it's a one
* time check done each time a connection is re-established with the
* Feeder. The clocks could be in sync at the time of this check and drift
* apart over the lifetime of the connection. It's also for this reason
* that we do not store the skew value and make compensations using it when
* determining replica consistency.
* <p>
* Replications nodes should therefore ensure that they are using NTP or a
* similar time synchronization service to keep time on all the replication
* nodes in a group in sync.
* <p>
*
* @throws IOException
* @throws EnvironmentFailureException
* @throws ProtocolException
*/
private void checkClockSkew()
throws IOException,
ProtocolException {
boolean isLast = false;
int sampleCount = 0;
do {
if (checkClockSkew) {
/* Iterate until we have a value that's good enough. */
isLast = (++sampleCount >= CLOCK_SKEW_MAX_SAMPLE_SIZE) ||
(clockDelay <= CLOCK_SKEW_MIN_DELAY_MS);
} else {
isLast = true;
}
protocol.write(protocol.new SNTPRequest(isLast), namedChannel);
SNTPResponse response = protocol.read(namedChannel,
SNTPResponse.class);
if (response.getDelay() < clockDelay) {
clockDelay = response.getDelay();
clockDelta = response.getDelta();
}
} while (!isLast);
if (!checkClockSkew) {
return;
}
LoggerUtils.logMsg
(logger, repImpl,
(Math.abs(clockDelta) >= maxClockDelta) ?
Level.SEVERE : Level.FINE,
"Round trip delay: " + clockDelay + " ms. " + "Clock delta: " +
clockDelta + " ms. " + "Max permissible delta: " +
maxClockDelta + " ms.");
if (Math.abs(clockDelta) >= maxClockDelta) {
throw new EnvironmentFailureException
(repImpl,
EnvironmentFailureReason.HANDSHAKE_ERROR,
"Clock delta: " + clockDelta + " ms. " +
"between Feeder: " + feederNameIdPair.getName() +
" and this Replica exceeds max permissible delta: " +
maxClockDelta + " ms.");
}
}
}