blob: 57fc05c066d85bfd6253f7009ae5d1deb63df276 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.stream;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.cleaner.FileProtector.ProtectedFileSet;
import com.sleepycat.je.config.EnvironmentParams;
import com.sleepycat.je.dbi.DatabaseId;
import com.sleepycat.je.dbi.DbTree;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.log.ChecksumException;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.node.Feeder;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.stream.BaseProtocol.EntryRequest;
import com.sleepycat.je.rep.stream.BaseProtocol.EntryRequestType;
import com.sleepycat.je.rep.stream.BaseProtocol.RestoreRequest;
import com.sleepycat.je.rep.stream.BaseProtocol.StartStream;
import com.sleepycat.je.rep.subscription.StreamAuthenticator;
import com.sleepycat.je.rep.txn.ReadonlyTxn;
import com.sleepycat.je.rep.utilint.BinaryProtocol.Message;
import com.sleepycat.je.rep.utilint.NamedChannel;
import com.sleepycat.je.rep.vlsn.VLSNIndex;
import com.sleepycat.je.rep.vlsn.VLSNRange;
import com.sleepycat.je.txn.Txn;
import com.sleepycat.je.utilint.DbLsn;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.TestHook;
import com.sleepycat.je.utilint.TestHookExecute;
import com.sleepycat.je.utilint.VLSN;
/**
* Establish where the replication stream should start for a feeder and replica
* pair. The Feeder's job is to send the replica the parts of the replication
* stream it needs, so that the two can determine a common matchpoint.
*
* If a successful matchpoint is found the feeder learns where to start the
* replication stream for this replica.
*/
public class FeederReplicaSyncup {
/**
* When replay is possible, ideally we would use a cost-based approach to
* decide whether to use replay vs network restore [#26890]. For now at
* least avoid a "large" replay for a new/replaced/wiped node, which is
* detected when VLSN 1 is requested. The max number of VLSNs to replay
* in this situation is calculated to be one minute of replay time:
* Avg replay rate is 20k ops/s == 40k vlsn/s (due to commit VLSNs)
* 40k vlsn/s * 60s/min == 2400k vlsn/min
*/
private static final int MAX_INITIAL_REPLAY = 2400 * 1000;
/* Overrides MAX_INITIAL_REPLAY for testing. */
private static volatile int testMaxInitialReplay;
/* A test hook that is called after a syncup has started. */
private static volatile TestHook<Feeder> afterSyncupStartedHook;
/* A test hook that is called after a syncup has ended. */
private static volatile TestHook<Feeder> afterSyncupEndedHook;
private final Feeder feeder;
private final RepNode repNode;
private final NamedChannel namedChannel;
private final Protocol protocol;
private final VLSNIndex vlsnIndex;
private final Logger logger;
private FeederSyncupReader backwardsReader;
public FeederReplicaSyncup(Feeder feeder,
NamedChannel namedChannel,
Protocol protocol) {
this.feeder = feeder;
this.repNode = feeder.getRepNode();
logger = LoggerUtils.getLogger(getClass());
this.namedChannel = namedChannel;
this.protocol = protocol;
this.vlsnIndex = repNode.getVLSNIndex();
}
/**
* The feeder's side of the protocol. Find out where to start the
* replication stream.
*
* @throws NetworkRestoreException if sync up failed and network store is
* required
* @throws ChecksumException if checksum validation failed
*/
public void execute()
throws DatabaseException, IOException, InterruptedException,
NetworkRestoreException, ChecksumException {
final long startTime = System.currentTimeMillis();
RepImpl repImpl = repNode.getRepImpl();
LoggerUtils.info(logger, repImpl,
"Feeder-replica " +
feeder.getReplicaNameIdPair().getName() +
" syncup started. Feeder range: " +
repNode.getVLSNIndex().getRange());
/*
* Prevent the VLSNIndex range from being changed and protect all files
* in the range. To search the index and read files within this range
* safely, VLSNIndex.getRange must be called after syncupStarted.
*/
final ProtectedFileSet protectedFileSet =
repNode.syncupStarted(feeder.getReplicaNameIdPair());
try {
assert TestHookExecute.doHookIfSet(afterSyncupStartedHook, feeder);
/*
* Wait for the replica to start the syncup message exchange. The
* first message will always be an EntryRequest. This relies on the
* fact that a brand new group always begins with a master that has
* a few vlsns from creating the nameDb that exist before a replica
* syncup. The replica will never issue a StartStream before doing
* an EntryRequest.
*
* The first entry request has three possible types of message
* responses - EntryNotFound, AlternateMatchpoint, or Entry.
*/
VLSNRange range = vlsnIndex.getRange();
EntryRequest firstRequest =
(EntryRequest) protocol.read(namedChannel);
Message response = makeResponseToEntryRequest(range,
firstRequest,
true);
protocol.write(response, namedChannel);
/*
* Now the replica may send one of three messages:
* - a StartStream message indicating that the replica wants to
* start normal operations
* - a EntryRequest message if it's still hunting for a
* matchpoint. There's the possibility that the new EntryRequest
* asks for a VLSN that has been log cleaned, so check that we can
* supply it.
* - a RestoreRequest message that indicates that the replica
* has given up, and will want a network restore.
*/
VLSN startVLSN;
while (true) {
Message message = protocol.read(namedChannel);
if (logger.isLoggable(Level.FINEST)) {
LoggerUtils.finest(logger, repImpl,
"Replica " +
feeder.getReplicaNameIdPair() +
" message op: " + message.getOp());
}
if (message instanceof StartStream) {
final StartStream startMessage = (StartStream) message;
startVLSN = startMessage.getVLSN();
/* set feeder filter */
final FeederFilter filter = startMessage.getFeederFilter();
if (filter != null) {
filter.setLogger(logger);
}
feeder.setFeederFilter(filter);
/*
* skip security check if not needed, e.g., a replica in
* a secure store
*/
if (!feeder.needSecurityChecks()) {
break;
}
final StreamAuthenticator auth = feeder.getAuthenticator();
/* if security check is needed, auth cannot be null */
assert (auth != null);
/* remember table id strings of subscribed tables */
if (filter != null) {
auth.setTableIds(filter.getTableIds());
} else {
/* if no filter, subscribe all tables */
auth.setTableIds(null);
}
/* security check */
if (!auth.checkAccess()) {
final String err = "Replica " +
feeder.getReplicaNameIdPair()
.getName() +
" fails security check " +
"in start stream syncup";
LoggerUtils.warning(logger, repImpl, err);
/* signal client */
feeder.makeSecurityCheckResponse(err);
}
break;
} else if (message instanceof EntryRequest) {
response = makeResponseToEntryRequest
(range, (EntryRequest) message, false);
protocol.write(response, namedChannel);
} else if (message instanceof RestoreRequest) {
throw answerRestore(range,
((RestoreRequest) message).getVLSN());
} else if (message instanceof Protocol.DBIdRequest) {
final String dbName =
((Protocol.DBIdRequest) message).getDbName();
response = makeResponseToDBIdRequest(repNode, dbName);
protocol.write(response, namedChannel);
} else {
throw EnvironmentFailureException.unexpectedState
(repImpl,
"Expected StartStream or EntryRequest but got " +
message);
}
}
LoggerUtils.info(logger, repImpl,
"Feeder-replica " +
feeder.getReplicaNameIdPair().getName() +
" start stream at VLSN: " + startVLSN);
feeder.initMasterFeederSource(startVLSN);
} finally {
repNode.syncupEnded(protectedFileSet);
assert TestHookExecute.doHookIfSet(afterSyncupEndedHook, feeder);
LoggerUtils.info
(logger, repImpl,
String.format("Feeder-replica " +
feeder.getReplicaNameIdPair().getName() +
" syncup ended. Elapsed time: %,dms",
(System.currentTimeMillis() - startTime)));
}
}
private static int getMaxInitialReplay() {
return testMaxInitialReplay != 0 ?
testMaxInitialReplay : MAX_INITIAL_REPLAY;
}
/** For testing. */
public static void setTestMaxInitialReplay(int val) {
testMaxInitialReplay = val;
}
/** For testing. */
public static void setAfterSyncupStartedHook(TestHook<Feeder> hook) {
afterSyncupStartedHook = hook;
}
/** For testing. */
public static void setAfterSyncupEndedHook(TestHook<Feeder> hook) {
afterSyncupEndedHook = hook;
}
private FeederSyncupReader setupReader(VLSN startVLSN)
throws DatabaseException, IOException {
EnvironmentImpl envImpl = repNode.getRepImpl();
int readBufferSize = envImpl.getConfigManager().
getInt(EnvironmentParams.LOG_ITERATOR_READ_SIZE);
/*
* A BackwardsReader for scanning the log file backwards.
*/
long lastUsedLsn = envImpl.getFileManager().getLastUsedLsn();
VLSN firstVLSN = vlsnIndex.getRange().getFirst();
long firstFile = vlsnIndex.getLTEFileNumber(firstVLSN);
long finishLsn = DbLsn.makeLsn(firstFile, 0);
return new FeederSyncupReader(envImpl,
vlsnIndex,
lastUsedLsn,
readBufferSize,
startVLSN,
finishLsn);
}
private Message makeResponseToDBIdRequest(RepNode rn, String dbName) {
final DatabaseId dbId = getDBId(rn.getRepImpl(), dbName);
return protocol.new DBIdResponse(dbId);
}
/**
* Returns db id for given database
*
* @param repImpl replication env
* @param dbName db name
*
* @return th db id of given db
* @throws DatabaseNotFoundException if db id not found
*/
public static DatabaseId getDBId(RepImpl repImpl, String dbName)
throws DatabaseNotFoundException {
final DbTree dbTree = repImpl.getDbTree();
/* need a locker to call getDbIdFromName() */
final TransactionConfig txnConfig = new TransactionConfig();
final Txn txn = new ReadonlyTxn(repImpl, txnConfig);
final DatabaseId dbId;
try {
dbId = dbTree.getDbIdFromName(txn, dbName, null, false);
txn.commit();
} catch (Exception exp) {
txn.abort();
throw exp;
}
/* must find the db to proceed */
if (dbId == null) {
throw new DatabaseNotFoundException("Cannot find db id for " +
"JE database " + dbName);
}
return dbId;
}
private Message makeResponseToEntryRequest(VLSNRange range,
EntryRequest request,
boolean isFirstResponse)
throws IOException, ChecksumException {
final VLSN requestMatchpoint = request.getVLSN();
final EntryRequestType type = request.getType();
/* if NOW mode, return high end regardless of requested vlsn */
if (type.equals(EntryRequestType.NOW)) {
/*
* VLSN range is not empty even without user data, so we can
* always get a valid entry.
*/
return protocol.new Entry(getMatchPtRecord(range.getLast()));
}
/* stream modes other than NOW */
/*
* The matchpoint must be in the VLSN range, or more specifically, in
* the VLSN index so we can map the VLSN to the lsn in order to fetch
* the associated log record.
*/
if (range.getFirst().compareTo(requestMatchpoint) > 0) {
/* request point is smaller than lower bound of range */
if (type.equals(BaseProtocol.EntryRequestType.AVAILABLE)) {
return protocol.new Entry(getMatchPtRecord(range.getFirst()));
}
/* default mode */
return protocol.new EntryNotFound();
}
if (range.getLast().compareTo(requestMatchpoint) < 0) {
/* request point is higher than upper bound of range */
if (type.equals(EntryRequestType.AVAILABLE)) {
return protocol.new Entry(getMatchPtRecord(range.getLast()));
}
/*
* default mode:
*
* The matchpoint is after the last one in the range. We have to
* suggest the lastSync entry on this node as an alternative. This
* should only happen on the feeder's first response. For example,
* suppose the feeder's range is vlsns 1-100. It's possible that
* the exchange is as follows:
* 1 - replica has 1-110, asks feeder for 110
* 2 - feeder doesn't have 110, counters with 100
* 3 - from this point on, the replica should only ask for vlsns
* that are <= the feeder's counter offer of 100
* Guard that this holds true, because the feeder's log reader is
* only set to search backwards; it does not expect to toggle
* between forward and backwards.
*/
assert backwardsReader == null :
"Replica request for vlsn > feeder range should only happen " +
"on the first exchange.";
if (range.getLastSync().equals(VLSN.NULL_VLSN)) {
/*
* We have no syncable entry at all. The replica will have to
* do a network restore.
*/
return protocol.new EntryNotFound();
}
if (isFirstResponse) {
final OutputWireRecord lastSync =
getMatchPtRecord(range.getLastSync());
assert lastSync != null :
"Look for alternative, range=" + range;
return protocol.new AlternateMatchpoint(lastSync);
}
throw EnvironmentFailureException.unexpectedState
(repNode.getRepImpl(), "RequestMatchpoint=" +
requestMatchpoint + " range=" + range +
"should only happen on first response");
}
/*
* When VLSN 1 is requested the feeder will respond with EntryNotFound
* even when it has VLSN 1, if the replay range is "large". The
* intention is avoid long replays for new/replaced/wiped nodes.
*/
if (requestMatchpoint.equals(VLSN.FIRST_VLSN) &&
range.getLast().getSequence() > getMaxInitialReplay()) {
return protocol.new EntryNotFound();
}
/* The matchpoint is within the range. Find it. */
final OutputWireRecord matchRecord =
getMatchPtRecord(requestMatchpoint);
if (matchRecord == null) {
throw EnvironmentFailureException.unexpectedState
(repNode.getRepImpl(),
"Couldn't find matchpoint " + requestMatchpoint +
" in log. VLSN range=" + range);
}
return protocol.new Entry(matchRecord);
}
/* scan log backwards to find match point record */
private OutputWireRecord getMatchPtRecord(VLSN matchPointVLSN)
throws IOException, ChecksumException {
if (backwardsReader == null) {
backwardsReader = setupReader(matchPointVLSN);
}
return backwardsReader.scanBackwards(matchPointVLSN);
}
private NetworkRestoreException answerRestore(VLSNRange range,
VLSN failedMatchpoint)
throws IOException {
/*
* Note that getGlobalCBVLSN returns a null VLSN if the GlobalCBVLSN
* is defunct. In that case the RestoreResponse.cbvlsn field is unused.
*/
Message response = protocol.new
RestoreResponse(repNode.getRestoreResponseVLSN(range),
repNode.getLogProviders());
protocol.write(response, namedChannel);
return new NetworkRestoreException(failedMatchpoint,
range.getFirst(),
range.getLast(),
feeder.getReplicaNameIdPair());
}
@SuppressWarnings("serial")
static public class NetworkRestoreException extends Exception {
/* The out-of-range vlsn that provoked the exception */
private final VLSN vlsn;
private final VLSN firstVLSN;
private final VLSN lastVLSN;
/* The replica that made the request. */
private final NameIdPair replicaNameIdPair;
public NetworkRestoreException(VLSN vlsn,
VLSN firstVLSN,
VLSN lastVLSN,
NameIdPair replicaNameIdPair) {
this.vlsn = vlsn;
this.firstVLSN = firstVLSN;
this.lastVLSN = lastVLSN;
this.replicaNameIdPair = replicaNameIdPair;
}
@Override
public String getMessage() {
return "Matchpoint vlsn " + vlsn + " requested by node: " +
replicaNameIdPair + " was outside the VLSN range: " +
"[" + firstVLSN + "-" + lastVLSN + "]";
}
public VLSN getVlsn() {
return vlsn;
}
public NameIdPair getReplicaNameIdPair() {
return replicaNameIdPair;
}
}
}