| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| |
| package com.sleepycat.je.rep.stream; |
| |
| import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_ACK_MESSAGES; |
| import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUPED_ACKS; |
| import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_GROUP_ACK_MESSAGES; |
| import static com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition.N_MAX_GROUPED_ACKS; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.ObjectOutputStream; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| |
| import com.sleepycat.je.DatabaseException; |
| import com.sleepycat.je.Durability.SyncPolicy; |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.JEVersion; |
| import com.sleepycat.je.log.LogEntryType; |
| import com.sleepycat.je.log.LogUtils; |
| import com.sleepycat.je.rep.impl.RepGroupImpl; |
| import com.sleepycat.je.rep.impl.RepImpl; |
| import com.sleepycat.je.rep.impl.RepNodeImpl; |
| import com.sleepycat.je.rep.impl.node.NameIdPair; |
| import com.sleepycat.je.rep.impl.node.cbvlsn.GlobalCBVLSN; |
| import com.sleepycat.je.rep.utilint.BinaryProtocol; |
| import com.sleepycat.je.utilint.LongMaxStat; |
| import com.sleepycat.je.utilint.LongMaxZeroStat; |
| import com.sleepycat.je.utilint.LongStat; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| /** |
| * Defines the base protocol of messages used to set up a stream between |
| * source and target. |
| * |
| * Note BaseProtocol represents a set of basic protocol operations intended to |
| * be used by subclasses. For a complete description of message operations |
| * used in JE HA protocol, please see the Protocol class in the same package. |
| * |
| * @see com.sleepycat.je.rep.stream.Protocol |
| */ |
| public abstract class BaseProtocol extends BinaryProtocol { |
| |
| /* --------------------------- */ |
| /* --- protocol versions --- */ |
| /* --------------------------- */ |
| |
| /* |
| * Note that the GROUP_ACK response message was introduced in version 5, |
| * but is disabled by default via RepParams.REPLICA_ENABLE_GROUP_ACKS. |
| * |
| * It can be enabled when we can increase the protocol version number. |
| */ |
| |
| /* The default (highest) version supported by the Protocol code. */ |
| public static final int MAX_VERSION = 9; |
| |
| /* The minimum version we're willing to interact with. */ |
| static final int MIN_VERSION = 3; |
| |
| /* |
| * Version added in JE 18.3.4 to support return security check failure |
| * response to stream client |
| */ |
| public static final int VERSION_9 = 9; |
| |
| public static final JEVersion VERSION_9_JE_VERSION = |
| new JEVersion("18.3.4"); |
| |
| /* |
| * Version added in JE 18.3.1 to support partition gen table db id look up |
| */ |
| public static final int VERSION_8 = 8; |
| |
| public static final JEVersion VERSION_8_JE_VERSION = |
| new JEVersion("18.3.1"); |
| |
| /* |
| * Version added in JE 7.5.6 to support entry request type |
| */ |
| public static final int VERSION_7 = 7; |
| public static final JEVersion VERSION_7_JE_VERSION = |
| new JEVersion("7.5.6"); |
| |
| /* |
| * Version added in JE 6.4.10 to support generic feeder filtering |
| */ |
| public static final int VERSION_6 = 6; |
| public static final JEVersion VERSION_6_JE_VERSION = |
| new JEVersion("6.4.10"); |
| |
| /* Version added in JE 6.0.1 to support RepGroupImpl version 3. */ |
| public static final int VERSION_5 = 5; |
| public static final JEVersion VERSION_5_JE_VERSION = |
| new JEVersion("6.0.1"); |
| |
| /* |
| * Version in which HEARTBEAT_RESPONSE added a second field. We can manage |
| * without this optional additional information if we have to, we we can |
| * still interact with the previous protocol version. (JE 5.0.58) |
| */ |
| static final int VERSION_4 = 4; |
| public static final JEVersion VERSION_4_JE_VERSION = |
| new JEVersion("5.0.58"); |
| |
| /* Version added in JE 4.0.50 to address byte order issues. */ |
| static final int VERSION_3 = 3; |
| public static final JEVersion VERSION_3_JE_VERSION = |
| new JEVersion("4.0.50"); |
| |
| /* ------------------------------------------ */ |
| /* --- messages defined in base protocol --- */ |
| /* ------------------------------------------ */ |
| |
| /* range of op codes allowed in subclasses, inclusively. */ |
| protected final static short MIN_MESSAGE_OP_CODE_IN_SUBCLASS = 1024; |
| protected final static short MAX_MESSAGE_OP_CODE_IN_SUBCLASS = 2047; |
| |
| /* |
| * Following ops are core replication stream post-handshake messages |
| * defined in streaming protocol and are intended to be used in subclasses. |
| * |
| * Note these msg op codes inherit from original implementation of stream |
| * protocol. Due to backward compatibility requirement, we keep them |
| * unchanged and directly copy them here. |
| */ |
| public final static MessageOp ENTRY = |
| new MessageOp((short) 101, Entry.class); |
| |
| public final static MessageOp START_STREAM = |
| new MessageOp((short) 102, StartStream.class); |
| |
| public final static MessageOp HEARTBEAT = |
| new MessageOp((short) 103, Heartbeat.class); |
| |
| public final static MessageOp HEARTBEAT_RESPONSE = |
| new MessageOp((short) 104, HeartbeatResponse.class); |
| |
| public final static MessageOp COMMIT = |
| new MessageOp((short) 105, Commit.class); |
| |
| public final static MessageOp ACK = |
| new MessageOp((short) 106, Ack.class); |
| |
| public final static MessageOp ENTRY_REQUEST = |
| new MessageOp((short) 107, EntryRequest.class); |
| |
| public final static MessageOp ENTRY_NOTFOUND = |
| new MessageOp((short) 108, EntryNotFound.class); |
| |
| public final static MessageOp ALT_MATCHPOINT = |
| new MessageOp((short) 109, AlternateMatchpoint.class); |
| |
| public final static MessageOp RESTORE_REQUEST = |
| new MessageOp((short) 110, RestoreRequest.class); |
| |
| public final static MessageOp RESTORE_RESPONSE = |
| new MessageOp((short) 111, RestoreResponse.class); |
| |
| public final static MessageOp SHUTDOWN_REQUEST = |
| new MessageOp((short) 112, ShutdownRequest.class); |
| |
| public final static MessageOp SHUTDOWN_RESPONSE = |
| new MessageOp((short) 113, ShutdownResponse.class); |
| |
| public final static MessageOp GROUP_ACK = |
| new MessageOp((short) 114, GroupAck.class); |
| |
| /* --------------------------- */ |
| /* -------- fields --------- */ |
| /* --------------------------- */ |
| |
| /** The log version of the format used to write log entries to the stream. */ |
| protected int streamLogVersion; |
| |
| /* Count of all singleton ACK messages. */ |
| protected final LongStat nAckMessages; |
| |
| /* Count of all group ACK messages. */ |
| protected final LongStat nGroupAckMessages; |
| |
| /* Sum of all acks sent via group ACK messages. */ |
| protected final LongStat nGroupedAcks; |
| |
| /* Max number of acks sent via a single group ACK message. */ |
| protected final LongMaxStat nMaxGroupedAcks; |
| |
| protected final RepImpl repImpl; |
| |
| /** |
| * Whether to fix the log version for log entries received from JE 7.0.x |
| * feeders that use log version 12 format but are incorrectly marked with |
| * later log versions due to a bug ([#25222]). The problem is that the |
| * feeder supplies an entry in log version 12 (LOG_VERSION_EXPIRE_INFO) |
| * format, but says it has a later log version. |
| * |
| * <p>This field is only set to true by the replica, which only reads and |
| * writes it from the main Replica thread, so no synchronization is needed. |
| */ |
| private boolean fixLogVersion12Entries = false; |
| |
| /** |
| * Returns a BaseProtocol object configured that implements the specified |
| * (supported) protocol version. |
| * |
| * @param repImpl the node using the protocol |
| * |
| * @param nameIdPair name-id pair of the node using the protocol |
| * |
| * @param protocolVersion the version of the protocol that must be |
| * implemented by this object |
| * |
| * @param maxProtocolVersion the highest supported protocol version, which |
| * may be lower than the code version, for testing purposes |
| * |
| * @param streamLogVersion the log version of the format used to write log |
| * entries |
| * |
| * @param protocolOps the message operations that make up this protocol |
| * |
| * @param checkValidity whether to check the message operations for |
| * validity. Checks should be performed for new protocols, but |
| * suppressed for legacy ones. |
| */ |
| BaseProtocol(final RepImpl repImpl, |
| final NameIdPair nameIdPair, |
| final int protocolVersion, |
| final int maxProtocolVersion, |
| final int streamLogVersion, |
| final MessageOp[] protocolOps, |
| final boolean checkValidity) { |
| super(nameIdPair, maxProtocolVersion, protocolVersion, repImpl); |
| this.streamLogVersion = streamLogVersion; |
| this.repImpl = repImpl; |
| |
| nAckMessages = new LongStat(stats, N_ACK_MESSAGES); |
| nGroupAckMessages = new LongStat(stats, N_GROUP_ACK_MESSAGES); |
| nGroupedAcks = new LongStat(stats, N_GROUPED_ACKS); |
| nMaxGroupedAcks = new LongMaxZeroStat(stats, N_MAX_GROUPED_ACKS); |
| initializeMessageOps(protocolOps, checkValidity); |
| } |
| |
| /** |
| * Returns a BaseProtocol object configured that implements the specified |
| * (supported) protocol version, with enforced message operation |
| * code validity check. |
| * |
| * This constructor enforces checking validity of message operation code. |
| * It should be used in any subclass except the legacy HA protocol in |
| * je.stream.Protocol. |
| * |
| * @param repImpl the node using the protocol |
| * |
| * @param nameIdPair name-id pair of the node using the protocol |
| * |
| * @param protocolVersion the version of the protocol that must be |
| * implemented by this object |
| * |
| * @param maxProtocolVersion the highest supported protocol version, which |
| * may be lower than the code version, for testing purposes |
| * |
| * @param streamLogVersion the log version of the format used to write log |
| * entries |
| * |
| * @param protocolOps the message operations that make up this protocol |
| */ |
| protected BaseProtocol(final RepImpl repImpl, |
| final NameIdPair nameIdPair, |
| final int protocolVersion, |
| final int maxProtocolVersion, |
| final int streamLogVersion, |
| final MessageOp[] protocolOps) { |
| |
| this(repImpl, nameIdPair, protocolVersion, maxProtocolVersion, |
| streamLogVersion, protocolOps, true); |
| } |
| |
| public int getStreamLogVersion() { |
| return streamLogVersion; |
| } |
| |
| /** |
| * Invoked in cases where the stream log version is not known at the time |
| * the protocol object is created and stream log version negotiations are |
| * required to determine the version that will be used for log records sent |
| * over the HA stream. |
| * |
| * @param streamLogVersion the maximum log version associated with stream |
| * records |
| */ |
| public void setStreamLogVersion(int streamLogVersion) { |
| this.streamLogVersion = streamLogVersion; |
| } |
| |
| /** |
| * Returns whether log entries need their log versions fixed to work around |
| * [#25222]. |
| */ |
| public boolean getFixLogVersion12Entries() { |
| return fixLogVersion12Entries; |
| } |
| |
| /** |
| * Sets whether log entries need their log versions fixed to work around |
| * [#25222]. |
| */ |
| public void setFixLogVersion12Entries(boolean value) { |
| fixLogVersion12Entries = value; |
| } |
| |
| /* ------------------------------------------------- */ |
| /* --- message classes defined in base protocol --- */ |
| /* ------------------------------------------------- */ |
| |
| /** |
| * A message containing a log entry in the replication stream. |
| */ |
| public class Entry extends Message { |
| |
| /* |
| * InputWireRecord is set when this Message had been received at this |
| * node. OutputWireRecord is set when this message is created for |
| * sending from this node. |
| */ |
| final protected InputWireRecord inputWireRecord; |
| protected OutputWireRecord outputWireRecord; |
| |
| public Entry(final OutputWireRecord outputWireRecord) { |
| inputWireRecord = null; |
| this.outputWireRecord = outputWireRecord; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return ENTRY; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| final int bodySize = getWireSize(); |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| writeOutputWireRecord(outputWireRecord, messageBuffer); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| protected int getWireSize() { |
| return outputWireRecord.getWireSize(streamLogVersion); |
| } |
| |
| public Entry(final ByteBuffer buffer) |
| throws DatabaseException { |
| |
| inputWireRecord = |
| new InputWireRecord(repImpl, buffer, BaseProtocol.this); |
| } |
| |
| public InputWireRecord getWireRecord() { |
| return inputWireRecord; |
| } |
| |
| @Override |
| public String toString() { |
| |
| final StringBuilder sb = new StringBuilder(); |
| sb.append(super.toString()); |
| |
| if (inputWireRecord != null) { |
| sb.append(" "); |
| sb.append(inputWireRecord); |
| } |
| |
| if (outputWireRecord != null) { |
| sb.append(" "); |
| sb.append(outputWireRecord); |
| } |
| |
| return sb.toString(); |
| } |
| |
| /* For unit test support */ |
| @Override |
| public boolean match(Message other) { |
| |
| /* |
| * This message was read in, but we need to compare it to a message |
| * that was sent out. |
| */ |
| if (outputWireRecord == null) { |
| outputWireRecord = new OutputWireRecord(repImpl, |
| inputWireRecord); |
| } |
| return super.match(other); |
| } |
| |
| /* True if the log entry is a TxnAbort or TxnCommit. */ |
| public boolean isTxnEnd() { |
| final byte entryType = getWireRecord().getEntryType(); |
| return LogEntryType.LOG_TXN_COMMIT.equalsType(entryType) || |
| LogEntryType.LOG_TXN_ABORT.equalsType(entryType); |
| |
| } |
| } |
| |
| /** |
| * StartStream indicates that the replica would like the feeder to start |
| * the replication stream at the proposed vlsn. |
| */ |
| public class StartStream extends VLSNMessage { |
| private final FeederFilter feederFilter; |
| |
| StartStream(VLSN startVLSN) { |
| super(startVLSN); |
| feederFilter = null; |
| } |
| |
| StartStream(VLSN startVLSN, FeederFilter filter) { |
| super(startVLSN); |
| feederFilter = filter; |
| } |
| |
| public StartStream(ByteBuffer buffer) { |
| super(buffer); |
| |
| /* Feeder filtering not supported before protocol version 6 */ |
| if (getVersion() < VERSION_6) { |
| feederFilter = null; |
| return; |
| } |
| |
| final int length = LogUtils.readInt(buffer); |
| if (length == 0) { |
| /* no filter is provided by client */ |
| feederFilter = null; |
| return; |
| } |
| |
| /* reconstruct filter from buffer */ |
| final byte filterBytes[] = |
| LogUtils.readBytesNoLength(buffer, length); |
| final ByteArrayInputStream bais = |
| new ByteArrayInputStream(filterBytes); |
| ObjectInputStream ois = null; |
| try { |
| ois = new ObjectInputStream(bais); |
| feederFilter = (FeederFilter) ois.readObject(); |
| } catch (ClassNotFoundException | IOException e) { |
| logger.warning(e.getLocalizedMessage()); |
| throw new IllegalStateException(e); |
| } finally { |
| if (ois != null) { |
| try { |
| ois.close(); |
| } catch (IOException e) { |
| logger.finest("exception raised when closing the " + |
| "object input stream object " + |
| e.getLocalizedMessage()); |
| } |
| } |
| } |
| } |
| |
| public FeederFilter getFeederFilter() { |
| return feederFilter; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| /* Feeder filtering not supported before protocol version 6 */ |
| if (getVersion() < VERSION_6) { |
| return super.wireFormat(); |
| } |
| |
| final int feederBufferSize; |
| final byte[] filterBytes; |
| |
| if (feederFilter != null) { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| ObjectOutputStream oos = null; |
| try { |
| oos = new ObjectOutputStream(baos); |
| oos.writeObject(feederFilter); |
| oos.flush(); |
| } catch (IOException e) { |
| logger.warning(e.getLocalizedMessage()); |
| throw new IllegalStateException(e); |
| } finally { |
| if (oos != null) { |
| try { |
| oos.close(); |
| } catch (IOException e) { |
| logger.finest("exception raised when closing the " + |
| "object output stream object " + |
| e.getLocalizedMessage()); |
| } |
| } |
| } |
| filterBytes = baos.toByteArray(); |
| feederBufferSize = filterBytes.length; |
| } else { |
| filterBytes = null; |
| feederBufferSize = 0; |
| } |
| |
| /* build message buffer */ |
| final int bodySize = wireFormatSize() + 4 + feederBufferSize; |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| /* write 8 bytes of VLSN */ |
| LogUtils.writeLong(messageBuffer, vlsn.getSequence()); |
| /* write 4 bytes of feeder buf size */ |
| LogUtils.writeInt(messageBuffer, feederBufferSize); |
| /* write feeder buffer */ |
| if (feederBufferSize > 0) { |
| LogUtils.writeBytesNoLength(messageBuffer, filterBytes); |
| } |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return START_STREAM; |
| } |
| |
| @Override |
| public String toString() { |
| String filterString = (feederFilter == null) ? "[no filtering]" : |
| feederFilter.toString(); |
| |
| return super.toString() + " " + filterString; |
| } |
| } |
| |
| public class Heartbeat extends Message { |
| |
| private final long masterNow; |
| private final long currentTxnEndVLSN; |
| |
| public Heartbeat(long masterNow, long currentTxnEndVLSN) { |
| this.masterNow = masterNow; |
| this.currentTxnEndVLSN = currentTxnEndVLSN; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return HEARTBEAT; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| int bodySize = 8 * 2 /* masterNow + currentTxnEndVLSN */; |
| ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); |
| LogUtils.writeLong(messageBuffer, masterNow); |
| LogUtils.writeLong(messageBuffer, currentTxnEndVLSN); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| public Heartbeat(ByteBuffer buffer) { |
| masterNow = LogUtils.readLong(buffer); |
| currentTxnEndVLSN = LogUtils.readLong(buffer); |
| } |
| |
| public long getMasterNow() { |
| return masterNow; |
| } |
| |
| public long getCurrentTxnEndVLSN() { |
| return currentTxnEndVLSN; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " masterNow=" + masterNow + |
| " currentCommit=" + currentTxnEndVLSN; |
| } |
| } |
| |
| public class HeartbeatResponse extends Message { |
| |
| /* |
| * The latest syncupVLSN. If the GlobalCBVLSN is defunct, this field |
| * contains a null VLSN and is unused. If the GlobalCBVLSN is not |
| * defunct: |
| * - When sent by an arbiter or subscriber, this field is null |
| * and unused. |
| * - When sent by a replica, this is the replica's local CBVLSN and |
| * is used for updating the GlobalCBVLSN on the master. |
| */ |
| private final VLSN syncupVLSN; |
| |
| /* The latest commit/abort VLSN on the replica/arbiter/subscriber. */ |
| private final VLSN txnEndVLSN; |
| |
| public HeartbeatResponse(VLSN syncupVLSN, VLSN ackedVLSN) { |
| super(); |
| this.syncupVLSN = syncupVLSN; |
| this.txnEndVLSN = ackedVLSN; |
| } |
| |
| public HeartbeatResponse(ByteBuffer buffer) { |
| syncupVLSN = new VLSN(LogUtils.readLong(buffer)); |
| txnEndVLSN = |
| getVersion() >= VERSION_4 ? |
| new VLSN(LogUtils.readLong(buffer)) : |
| null; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return HEARTBEAT_RESPONSE; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| boolean includeTxnEndVLSN = getVersion() >= VERSION_4; |
| int bodySize = includeTxnEndVLSN ? |
| 8 * 2 : |
| 8; |
| ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); |
| LogUtils.writeLong(messageBuffer, syncupVLSN.getSequence()); |
| if (includeTxnEndVLSN) { |
| LogUtils.writeLong(messageBuffer, txnEndVLSN.getSequence()); |
| } |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| public VLSN getSyncupVLSN() { |
| return syncupVLSN; |
| } |
| |
| public VLSN getTxnEndVLSN() { |
| return txnEndVLSN; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + |
| " txnEndVLSN=" + txnEndVLSN + |
| " syncupVLSN=" + syncupVLSN; |
| } |
| } |
| |
| /** |
| * Message of a commit op |
| */ |
| public class Commit extends Entry { |
| private final boolean needsAck; |
| private final SyncPolicy replicaSyncPolicy; |
| |
| public Commit(final boolean needsAck, |
| final SyncPolicy replicaSyncPolicy, |
| final OutputWireRecord wireRecord) { |
| super(wireRecord); |
| this.needsAck = needsAck; |
| this.replicaSyncPolicy = replicaSyncPolicy; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return COMMIT; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| final int bodySize = super.getWireSize() + |
| 1 /* needsAck */ + |
| 1 /* replica sync policy */; |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| messageBuffer.put((byte) (needsAck ? 1 : 0)); |
| messageBuffer.put((byte) replicaSyncPolicy.ordinal()); |
| writeOutputWireRecord(outputWireRecord, messageBuffer); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| public Commit(final ByteBuffer buffer) |
| throws DatabaseException { |
| |
| this(getByteNeedsAck(buffer.get()), |
| getByteReplicaSyncPolicy(buffer.get()), |
| buffer); |
| } |
| |
| private Commit(final boolean needsAck, |
| final SyncPolicy replicaSyncPolicy, |
| final ByteBuffer buffer) |
| throws DatabaseException { |
| |
| super(buffer); |
| this.needsAck = needsAck; |
| this.replicaSyncPolicy = replicaSyncPolicy; |
| } |
| |
| public boolean getNeedsAck() { |
| return needsAck; |
| } |
| |
| public SyncPolicy getReplicaSyncPolicy() { |
| return replicaSyncPolicy; |
| } |
| } |
| |
| /** |
| * Message of an ack op |
| */ |
| public class Ack extends Message { |
| |
| private final long txnId; |
| |
| public Ack(long txnId) { |
| super(); |
| this.txnId = txnId; |
| nAckMessages.increment(); |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return ACK; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| int bodySize = 8; |
| ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); |
| LogUtils.writeLong(messageBuffer, txnId); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| public Ack(ByteBuffer buffer) { |
| txnId = LogUtils.readLong(buffer); |
| } |
| |
| public long getTxnId() { |
| return txnId; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " txn " + txnId; |
| } |
| } |
| |
| /** |
| * A replica node asks a feeder for the log entry at this VLSN. |
| */ |
| public class EntryRequest extends VLSNMessage { |
| |
| private final EntryRequestType type; |
| |
| EntryRequest(VLSN matchpoint) { |
| super(matchpoint); |
| type = EntryRequestType.DEFAULT; |
| } |
| |
| EntryRequest(VLSN matchpoint, EntryRequestType type) { |
| super(matchpoint); |
| this.type = type; |
| } |
| |
| public EntryRequest(ByteBuffer buffer) { |
| super(buffer); |
| |
| /* entry request type not supported before protocol version 7 */ |
| if (getVersion() < VERSION_7) { |
| type = EntryRequestType.DEFAULT; |
| return; |
| } |
| |
| final int i = LogUtils.readInt(buffer); |
| type = EntryRequestType.values()[i]; |
| } |
| |
| public EntryRequestType getType() { |
| return type; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| |
| /* type not supported before protocol version 7 */ |
| if (getVersion() < VERSION_7) { |
| return super.wireFormat(); |
| } |
| |
| /* build message buffer */ |
| final int bodySize = wireFormatSize(); |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| /* write 8 bytes of VLSN */ |
| LogUtils.writeLong(messageBuffer, vlsn.getSequence()); |
| /* write 4 bytes of type */ |
| LogUtils.writeInt(messageBuffer, type.ordinal()); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| @Override |
| public int wireFormatSize() { |
| /* type not supported before protocol version 7 */ |
| if (getVersion() < VERSION_7) { |
| return super.wireFormatSize(); |
| } |
| |
| return super.wireFormatSize() + 4; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return ENTRY_REQUEST; |
| } |
| |
| @Override |
| public String toString() { |
| return "entry request vlsn: " + super.toString() + |
| ", type: " + type; |
| } |
| } |
| |
| /** |
| * Type of entry request sent to feeder |
| * |
| * RV: VLSN requested by client |
| * LOW: low end of available VLSN range in vlsn index |
| * HIGH: high end of available VLSN range in vlsn index |
| * |
| * The DEFAULT mode is used by existing replication stream consumer e.g. |
| * replica, arbiter, secondary nodes, etc, while the others are only used |
| * in subscription (je.rep.subscription). |
| * |
| * {@literal |
| * ------------------------------------------------------------------- |
| * MODE | RV < LOW | RV in [LOW, HIGH] | RV > HIGH |
| * ------------------------------------------------------------------- |
| * DEFAULT | NOT_FOUND | REQUESTED ENTRY | ALT MATCH POINT |
| * AVAILABLE | LOW | REQUESTED ENTRY | HIGH |
| * NOW | HIGH | HIGH | HIGH |
| * } |
| */ |
| public enum EntryRequestType { |
| DEFAULT, |
| AVAILABLE, |
| NOW |
| } |
| |
| /** |
| * Response when the EntryRequest asks for a VLSN that is below the VLSN |
| * range covered by the Feeder. |
| */ |
| public class EntryNotFound extends Message { |
| |
| public EntryNotFound() { |
| } |
| |
| public EntryNotFound(@SuppressWarnings("unused") ByteBuffer buffer) { |
| super(); |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return ENTRY_NOTFOUND; |
| } |
| } |
| |
| public class AlternateMatchpoint extends Message { |
| |
| private final InputWireRecord alternateInput; |
| private OutputWireRecord alternateOutput = null; |
| |
| AlternateMatchpoint(final OutputWireRecord alternate) { |
| alternateInput = null; |
| this.alternateOutput = alternate; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return ALT_MATCHPOINT; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| final int bodySize = alternateOutput.getWireSize(streamLogVersion); |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| writeOutputWireRecord(alternateOutput, messageBuffer); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| public AlternateMatchpoint(final ByteBuffer buffer) |
| throws DatabaseException { |
| alternateInput = |
| new InputWireRecord(repImpl, buffer, BaseProtocol.this); |
| } |
| |
| public InputWireRecord getAlternateWireRecord() { |
| return alternateInput; |
| } |
| |
| /* For unit test support */ |
| @Override |
| public boolean match(Message other) { |
| |
| /* |
| * This message was read in, but we need to compare it to a message |
| * that was sent out. |
| */ |
| if (alternateOutput == null) { |
| alternateOutput = |
| new OutputWireRecord(repImpl, alternateInput); |
| } |
| return super.match(other); |
| } |
| } |
| |
| /** |
| * Request from the replica to the feeder for sufficient information to |
| * start a network restore. |
| */ |
| public class RestoreRequest extends VLSNMessage { |
| |
| RestoreRequest(VLSN failedMatchpoint) { |
| super(failedMatchpoint); |
| } |
| |
| public RestoreRequest(ByteBuffer buffer) { |
| super(buffer); |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return RESTORE_REQUEST; |
| } |
| } |
| |
| /** |
| * Response when the replica needs information to instigate a network |
| * restore. The message contains a set of nodes that could be used as the |
| * basis for a NetworkBackup so that the request node can become current |
| * again. |
| * |
| * <p>In addition, when support is needed for older replica nodes that use |
| * the GlobalCBVLSN, a vlsn is included. See |
| * {@link GlobalCBVLSN#getRestoreResponseVLSN}.</p> |
| */ |
| public class RestoreResponse extends SimpleMessage { |
| /* Is null VLSN and unused if the GlobalCBVLSN is defunct. */ |
| private final VLSN cbvlsn; |
| |
| private final RepNodeImpl[] logProviders; |
| |
| public RestoreResponse(VLSN cbvlsn, RepNodeImpl[] logProviders) { |
| this.cbvlsn = cbvlsn; |
| this.logProviders = logProviders; |
| } |
| |
| public RestoreResponse(ByteBuffer buffer) { |
| long vlsnSequence = LogUtils.readLong(buffer); |
| cbvlsn = new VLSN(vlsnSequence); |
| logProviders = getRepNodeImplArray(buffer); |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| return wireFormat(cbvlsn.getSequence(), logProviders); |
| } |
| |
| /* Add support for RepNodeImpl arrays. */ |
| |
| @Override |
| protected void putWireFormat(final ByteBuffer buffer, |
| final Object obj) { |
| if (obj.getClass() == RepNodeImpl[].class) { |
| putRepNodeImplArray(buffer, (RepNodeImpl[]) obj); |
| } else { |
| super.putWireFormat(buffer, obj); |
| } |
| } |
| |
| @Override |
| protected int wireFormatSize(final Object obj) { |
| if (obj.getClass() == RepNodeImpl[].class) { |
| return getRepNodeImplArraySize((RepNodeImpl[]) obj); |
| } |
| return super.wireFormatSize(obj); |
| } |
| |
| private void putRepNodeImplArray(final ByteBuffer buffer, |
| final RepNodeImpl[] ra) { |
| LogUtils.writeInt(buffer, ra.length); |
| final int groupFormatVersion = getGroupFormatVersion(); |
| for (final RepNodeImpl node : ra) { |
| putByteArray( |
| buffer, |
| RepGroupImpl.serializeBytes(node, groupFormatVersion)); |
| } |
| } |
| |
| private RepNodeImpl[] getRepNodeImplArray(final ByteBuffer buffer) { |
| final RepNodeImpl[] ra = new RepNodeImpl[LogUtils.readInt(buffer)]; |
| final int groupFormatVersion = getGroupFormatVersion(); |
| for (int i = 0; i < ra.length; i++) { |
| ra[i] = RepGroupImpl.deserializeNode( |
| getByteArray(buffer), groupFormatVersion); |
| } |
| return ra; |
| } |
| |
| private int getRepNodeImplArraySize(RepNodeImpl[] ra) { |
| int size = 4; /* array length */ |
| final int groupFormatVersion = getGroupFormatVersion(); |
| for (final RepNodeImpl node : ra) { |
| size += (4 /* Node size */ + |
| RepGroupImpl.serializeBytes(node, groupFormatVersion) |
| .length); |
| } |
| return size; |
| } |
| |
| /** |
| * Returns the RepGroupImpl version to use for the currently configured |
| * protocol version. |
| */ |
| private int getGroupFormatVersion() { |
| return (getVersion() < VERSION_5) ? |
| RepGroupImpl.FORMAT_VERSION_2 : |
| RepGroupImpl.MAX_FORMAT_VERSION; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return RESTORE_RESPONSE; |
| } |
| |
| RepNodeImpl[] getLogProviders() { |
| return logProviders; |
| } |
| |
| VLSN getCBVLSN() { |
| return cbvlsn; |
| } |
| } |
| |
| /** |
| * Message used to shutdown a node |
| */ |
| public class ShutdownRequest extends SimpleMessage { |
| /* The time that the shutdown was initiated on the master. */ |
| private final long shutdownTimeMs; |
| |
| public ShutdownRequest(long shutdownTimeMs) { |
| super(); |
| this.shutdownTimeMs = shutdownTimeMs; |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return SHUTDOWN_REQUEST; |
| } |
| |
| public ShutdownRequest(ByteBuffer buffer) { |
| shutdownTimeMs = LogUtils.readLong(buffer); |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| return wireFormat(shutdownTimeMs); |
| } |
| |
| public long getShutdownTimeMs() { |
| return shutdownTimeMs; |
| } |
| } |
| |
| /** |
| * Message in response to a shutdown request. |
| */ |
| public class ShutdownResponse extends Message { |
| |
| public ShutdownResponse() { |
| super(); |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return SHUTDOWN_RESPONSE; |
| } |
| |
| public ShutdownResponse(@SuppressWarnings("unused") ByteBuffer buffer) { |
| } |
| } |
| |
| public class GroupAck extends Message { |
| |
| private final long txnIds[]; |
| |
| public GroupAck(long txnIds[]) { |
| super(); |
| this.txnIds = txnIds; |
| nGroupAckMessages.increment(); |
| nGroupedAcks.add(txnIds.length); |
| nMaxGroupedAcks.setMax(txnIds.length); |
| } |
| |
| @Override |
| public MessageOp getOp() { |
| return GROUP_ACK; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| |
| final int bodySize = 4 + 8 * txnIds.length; |
| final ByteBuffer messageBuffer = |
| allocateInitializedBuffer(bodySize); |
| |
| putLongArray(messageBuffer, txnIds); |
| messageBuffer.flip(); |
| |
| return messageBuffer; |
| } |
| |
| public GroupAck(ByteBuffer buffer) { |
| txnIds = readLongArray(buffer); |
| } |
| |
| public long[] getTxnIds() { |
| return txnIds; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " txn " + Arrays.toString(txnIds); |
| } |
| } |
| |
| /** |
| * Base class for messages which contain only a VLSN |
| */ |
| protected abstract class VLSNMessage extends Message { |
| protected final VLSN vlsn; |
| |
| VLSNMessage(VLSN vlsn) { |
| super(); |
| this.vlsn = vlsn; |
| } |
| |
| public VLSNMessage(ByteBuffer buffer) { |
| long vlsnSequence = LogUtils.readLong(buffer); |
| vlsn = new VLSN(vlsnSequence); |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| int bodySize = wireFormatSize(); |
| ByteBuffer messageBuffer = allocateInitializedBuffer(bodySize); |
| LogUtils.writeLong(messageBuffer, vlsn.getSequence()); |
| messageBuffer.flip(); |
| return messageBuffer; |
| } |
| |
| int wireFormatSize() { |
| return 8; |
| } |
| |
| VLSN getVLSN() { |
| return vlsn; |
| } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " " + vlsn; |
| } |
| } |
| |
| /** |
| * Base class for all protocol handshake messages. |
| */ |
| protected abstract class HandshakeMessage extends SimpleMessage { |
| } |
| |
| /** |
| * Version broadcasts the sending node's protocol version. |
| */ |
| protected abstract class ProtocolVersion extends HandshakeMessage { |
| private final int version; |
| |
| @SuppressWarnings("hiding") |
| private final NameIdPair nameIdPair; |
| |
| public ProtocolVersion(int version) { |
| super(); |
| this.version = version; |
| this.nameIdPair = BaseProtocol.this.nameIdPair; |
| } |
| |
| @Override |
| public ByteBuffer wireFormat() { |
| return wireFormat(version, nameIdPair); |
| } |
| |
| public ProtocolVersion(ByteBuffer buffer) { |
| version = LogUtils.readInt(buffer); |
| nameIdPair = getNameIdPair(buffer); |
| } |
| |
| /** |
| * @return the version |
| */ |
| protected int getVersion() { |
| return version; |
| } |
| |
| /** |
| * The nodeName of the sender |
| * |
| * @return nodeName |
| */ |
| protected NameIdPair getNameIdPair() { |
| return nameIdPair; |
| } |
| } |
| |
| /* ---------------------------------------- */ |
| /* --- end of message class definition --- */ |
| /* ---------------------------------------- */ |
| |
| /** |
| * Write an entry output wire record to the message buffer using the write |
| * log version format and increment nEntriesWrittenOldVersion if the entry |
| * format was changed. |
| */ |
| protected void writeOutputWireRecord(final OutputWireRecord record, |
| final ByteBuffer messageBuffer) { |
| final boolean changedFormat = |
| record.writeToWire(messageBuffer, streamLogVersion); |
| if (changedFormat) { |
| nEntriesWrittenOldVersion.increment(); |
| } |
| } |
| |
| /** |
| * Initializes message ops and check if is valid within allocated range |
| * |
| * @param protocolOps ops to be initialized |
| * @param checkValidity true if check validity of op code |
| */ |
| private void initializeMessageOps(MessageOp[] protocolOps, |
| boolean checkValidity) { |
| |
| if (checkValidity) { |
| /* Check if op code is valid before initialization */ |
| for (MessageOp op : protocolOps) { |
| if (!isValidMsgOpCode(op.getOpId())) { |
| throw EnvironmentFailureException.unexpectedState |
| ("Op id: " + op.getOpId() + |
| " is out of allowed range inclusively [" + |
| MIN_MESSAGE_OP_CODE_IN_SUBCLASS + ", " + |
| MAX_MESSAGE_OP_CODE_IN_SUBCLASS + "]"); |
| } |
| } |
| } |
| initializeMessageOps(protocolOps); |
| } |
| |
| /** |
| * Returns whether the byte value specifies that an acknowledgment is |
| * needed. |
| */ |
| private static boolean getByteNeedsAck(final byte needsAckByte) { |
| switch (needsAckByte) { |
| case 0: |
| return false; |
| case 1: |
| return true; |
| default: |
| throw EnvironmentFailureException.unexpectedState( |
| "Invalid bool ordinal: " + needsAckByte); |
| } |
| } |
| |
| /** Checks if op code defined in subclass fall in pre-allocated range */ |
| private static boolean isValidMsgOpCode(short opId) { |
| |
| return (opId <= MAX_MESSAGE_OP_CODE_IN_SUBCLASS) && |
| (opId >= MIN_MESSAGE_OP_CODE_IN_SUBCLASS); |
| } |
| |
| /** Returns the sync policy specified by the argument. */ |
| private static SyncPolicy getByteReplicaSyncPolicy( |
| final byte syncPolicyByte) { |
| |
| for (final SyncPolicy p : SyncPolicy.values()) { |
| if (p.ordinal() == syncPolicyByte) { |
| return p; |
| } |
| } |
| throw EnvironmentFailureException.unexpectedState( |
| "Invalid sync policy ordinal: " + syncPolicyByte); |
| } |
| |
| /* Writes array of longs into buffer */ |
| private void putLongArray(ByteBuffer buffer, long[] la) { |
| LogUtils.writeInt(buffer, la.length); |
| |
| for (long l : la) { |
| LogUtils.writeLong(buffer, l); |
| } |
| } |
| |
| /* Reads array of longs from buffer */ |
| private long[] readLongArray(ByteBuffer buffer) { |
| final long la[] = new long[LogUtils.readInt(buffer)]; |
| |
| for (int i = 0; i < la.length; i++) { |
| la[i] = LogUtils.readLong(buffer); |
| } |
| |
| return la; |
| } |
| } |