| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| package com.sleepycat.je.rep.stream; |
| |
| |
| import static com.sleepycat.je.rep.stream.BaseProtocol.VERSION_8; |
| |
| import java.io.IOException; |
| import java.util.logging.Logger; |
| |
| import com.sleepycat.je.dbi.DatabaseId; |
| import com.sleepycat.je.rep.impl.RepImpl; |
| import com.sleepycat.je.rep.stream.BaseProtocol.AlternateMatchpoint; |
| import com.sleepycat.je.rep.stream.BaseProtocol.Entry; |
| import com.sleepycat.je.rep.stream.BaseProtocol.EntryNotFound; |
| import com.sleepycat.je.rep.stream.BaseProtocol.EntryRequestType; |
| import com.sleepycat.je.rep.utilint.BinaryProtocol.Message; |
| import com.sleepycat.je.rep.utilint.NamedChannel; |
| import com.sleepycat.je.utilint.InternalException; |
| import com.sleepycat.je.utilint.LoggerUtils; |
| import com.sleepycat.je.utilint.VLSN; |
| |
| /** |
| * Object to sync-up the Feeder and Subscriber to establish the VLSN from |
| * which subscriber should should start stream log entries from feeder. |
| */ |
| public class SubscriberFeederSyncup { |
| |
| private final static int MIN_VER_PART_GEN_DB = VERSION_8; |
| |
| private final Logger logger; |
| private final RepImpl repImpl; |
| private final NamedChannel namedChannel; |
| private final Protocol protocol; |
| private final FeederFilter filter; |
| private final EntryRequestType type; |
| /* partition gen db name */ |
| private final String partGenDBName; |
| |
| /* partition gen db id */ |
| private DatabaseId partGenDBId; |
| |
| public SubscriberFeederSyncup(NamedChannel namedChannel, |
| Protocol protocol, |
| FeederFilter filter, |
| RepImpl repImpl, |
| EntryRequestType type, |
| String partGenDBName, |
| Logger logger) { |
| this.namedChannel = namedChannel; |
| this.protocol = protocol; |
| this.filter = filter; |
| this.repImpl = repImpl; |
| this.type = type; |
| this.partGenDBName = partGenDBName; |
| this.logger = logger; |
| |
| partGenDBId = null; |
| } |
| |
| /** |
| * Execute sync-up to the Feeder. Request Feeder to start a replication |
| * stream from a start VLSN, if it is available. Otherwise return NULL |
| * VLSN to subscriber. |
| * |
| * @param reqVLSN VLSN requested by subscriber to stream log entries |
| * |
| * @return start VLSN from subscribe can stream log entries |
| * @throws InternalException if fail to execute syncup |
| */ |
| public VLSN execute(VLSN reqVLSN) throws InternalException { |
| |
| final long startTime = System.currentTimeMillis(); |
| |
| LoggerUtils.info(logger, repImpl, |
| "Subscriber-Feeder " + namedChannel.getNameIdPair() + |
| " syncup started."); |
| |
| try { |
| /* query the start VLSN from feeder */ |
| final VLSN startVLSN = getStartVLSNFromFeeder(reqVLSN); |
| /* if part gen db name, query partition gen db id from feeder */ |
| if (partGenDBName != null && !partGenDBName.isEmpty()) { |
| partGenDBId = getPartGenDBIdFromFeeder(); |
| } |
| if (!startVLSN.equals(VLSN.NULL_VLSN)) { |
| LoggerUtils.info(logger, repImpl, |
| "Response from feeder " + |
| namedChannel.getNameIdPair() + |
| ": the start VLSN " + startVLSN + |
| ", the requested VLSN " + reqVLSN + |
| ", send startStream request with filter."); |
| |
| /* start streaming from feeder if valid start VLSN */ |
| protocol.write(protocol.new StartStream(startVLSN, filter), |
| namedChannel); |
| } else { |
| LoggerUtils.info(logger, repImpl, |
| "Unable to stream from Feeder " + |
| namedChannel.getNameIdPair() + |
| " from requested VLSN " + reqVLSN); |
| } |
| return startVLSN; |
| } catch (IllegalStateException | IOException e) { |
| throw new InternalException(e.getMessage()); |
| } finally { |
| LoggerUtils.info(logger, repImpl, |
| String.format("Subscriber to feeder " + |
| namedChannel.getNameIdPair() + |
| " sync-up done, elapsed time: %,dms", |
| System.currentTimeMillis() - |
| startTime)); |
| } |
| } |
| |
| /** |
| * Returns Partition Generation DB Id |
| * |
| * @return partition generation db id, null if not initialized |
| */ |
| public DatabaseId getPartGenDBId() { |
| return partGenDBId; |
| } |
| |
| /** |
| * Request a start VLSN from feeder. The feeder will return a valid |
| * start VLSN, which can be equal to or earlier than the request VLSN, |
| * or null if feeder is unable to service the requested VLSN. |
| * |
| * @param requestVLSN start VLSN requested by subscriber |
| * |
| * @return VLSN a valid start VLSN from feeder, or null if it unavailable |
| * at the feeder |
| * @throws IOException if unable to read message from channel |
| * @throws IllegalStateException if the feeder sends an unexpected message |
| */ |
| private VLSN getStartVLSNFromFeeder(VLSN requestVLSN) |
| throws IOException, IllegalStateException { |
| |
| LoggerUtils.fine(logger, repImpl, |
| "Subscriber send requested VLSN " + requestVLSN + |
| " to feeder " + namedChannel.getNameIdPair()); |
| |
| /* ask the feeder for the requested VLSN. */ |
| protocol.write(protocol.new EntryRequest(requestVLSN, type), |
| namedChannel); |
| |
| /* |
| * Expect the feeder to return one of following if type is |
| * EntryRequestType.DEFAULT: |
| * a) not_found if the requested VLSN is too low |
| * b) the requested VLSN if the requested VLSN is found |
| * c) the alt match VLSN if the requested VLSN is too high |
| * |
| * If type is EntryRequestType.AVAILABLE: |
| * a) the lowest available VLSN if the requested VLSN is too low |
| * b) the requested VLSN if the requested VLSN is found |
| * c) the highest available VLSN if the request VLSN is too high |
| |
| * If type is EntryRequestType.NOW: |
| * a) always returns highest available VLSN |
| */ |
| final Message message = protocol.read(namedChannel); |
| final VLSN vlsn; |
| if (message instanceof Entry) { |
| vlsn = ((Entry) message).getWireRecord().getVLSN(); |
| |
| /* must be exact match for the default type */ |
| if (type.equals(EntryRequestType.DEFAULT)) { |
| assert (vlsn.equals(requestVLSN)); |
| } |
| |
| /* dump traces */ |
| if (vlsn.equals(requestVLSN)) { |
| LoggerUtils.finest(logger, repImpl, |
| "Subscriber successfully requested VLSN " + |
| requestVLSN + " from feeder " + |
| namedChannel.getNameIdPair() + |
| ", request type: " + type); |
| } |
| |
| if (vlsn.compareTo(requestVLSN) < 0) { |
| LoggerUtils.finest(logger, repImpl, |
| "Requested VLSN " + requestVLSN + |
| " is not available from feeder " + |
| namedChannel.getNameIdPair() + |
| " instead, start stream from a lowest " + |
| "available VLSN " + vlsn + |
| ", request type: " + type); |
| } |
| |
| if (vlsn.compareTo(requestVLSN) > 0) { |
| if (type.equals(EntryRequestType.NOW)) { |
| LoggerUtils.finest(logger, repImpl, |
| "Stream from highest available vlsn " + |
| "from feeder " + |
| namedChannel.getNameIdPair() + ":" + |
| vlsn + ", request type: " + type); |
| } else { |
| LoggerUtils.finest(logger, repImpl, |
| "Requested VLSN " + requestVLSN + |
| " is not available from feeder " + |
| namedChannel.getNameIdPair() + |
| " instead, start stream from a highest" + |
| " available VLSN " + vlsn + |
| ", request type: " + type); |
| } |
| } |
| |
| } else if (message instanceof AlternateMatchpoint) { |
| /* now and available type should not see alter match point */ |
| if (type.equals(EntryRequestType.NOW) || |
| type.equals(EntryRequestType.AVAILABLE)) { |
| String msg = "Receive unexpected response " + message + |
| "from feeder " + namedChannel.getNameIdPair() + |
| ", request type: " + type; |
| LoggerUtils.warning(logger, repImpl, msg); |
| throw new IllegalStateException(msg); |
| } |
| |
| vlsn = ((AlternateMatchpoint) message).getAlternateWireRecord() |
| .getVLSN(); |
| /* must be an earlier VLSN */ |
| assert (vlsn.compareTo(requestVLSN) < 0); |
| LoggerUtils.finest(logger, repImpl, |
| "Feeder " + namedChannel.getNameIdPair() + |
| " returns a valid start VLSN" + vlsn + |
| " but earlier than requested one " + |
| requestVLSN + ", request type: " + type); |
| |
| } else if (message instanceof EntryNotFound) { |
| /* now and available type should not see not found */ |
| if (type.equals(EntryRequestType.NOW) || |
| type.equals(EntryRequestType.AVAILABLE)) { |
| /* |
| * even for a brand new environment, the VLSN range at feeder |
| * is not empty so we should not see entry not found |
| */ |
| String msg = "Receive unexpected response " + message + |
| "from feeder " + namedChannel.getNameIdPair() + |
| ", request type: " + type; |
| LoggerUtils.warning(logger, repImpl, msg); |
| throw new IllegalStateException(msg); |
| } |
| |
| vlsn = VLSN.NULL_VLSN; |
| LoggerUtils.finest(logger, repImpl, |
| "Feeder " + namedChannel.getNameIdPair() + |
| " is unable to service the request vlsn " + |
| requestVLSN + ", request type: " + type); |
| } else { |
| /* unexpected response from feeder */ |
| String msg = "Receive unexpected response " + message + |
| "from feeder " + namedChannel.getNameIdPair() + |
| ", request type: " + type; |
| LoggerUtils.warning(logger, repImpl, msg); |
| throw new IllegalStateException(msg); |
| } |
| |
| return vlsn; |
| } |
| |
| /** |
| * Requests the partition generation db id from feeder |
| * |
| * @return the partition generation db id |
| * |
| * @throws IOException if unable to read from channel |
| * @throws IllegalStateException if receives unexpected response |
| */ |
| private DatabaseId getPartGenDBIdFromFeeder() |
| throws IOException, IllegalStateException { |
| |
| if(protocol.getVersion() < MIN_VER_PART_GEN_DB) { |
| LoggerUtils.info(logger, repImpl, |
| "Client provides part gen db name while the " + |
| "server protocol (ver " + protocol.getVersion() + |
| ") is not upgraded to support it, min ver to " + |
| "support partition generation " + |
| MIN_VER_PART_GEN_DB + ", ignore."); |
| return null; |
| } |
| |
| LoggerUtils.fine(logger, repImpl, |
| "Subscriber sends request for partition generation " + |
| "db id for " + partGenDBName + |
| " to feeder " + namedChannel.getNameIdPair()); |
| |
| /* ask the feeder for the db id. */ |
| protocol.write(protocol.new DBIdRequest(partGenDBName), namedChannel); |
| |
| /* |
| * Since we have already passed the protocol negotiation, the feeder |
| * shall support this protocol version and returns valid db id |
| */ |
| final Message message = protocol.read(namedChannel); |
| final DatabaseId id; |
| if (message instanceof Protocol.DBIdResponse) { |
| id = ((Protocol.DBIdResponse)message).getDbId(); |
| LoggerUtils.fine(logger, repImpl, |
| "Subscriber successfully requested partition " + |
| "generation db id " + id + " from feeder " + |
| namedChannel.getNameIdPair()); |
| } else { |
| /* unexpected response from feeder */ |
| String msg = "Receive unexpected response " + message + |
| "from feeder " + namedChannel.getNameIdPair() + |
| ", request type: " + type; |
| LoggerUtils.warning(logger, repImpl, msg); |
| throw new IllegalStateException(msg); |
| } |
| |
| return id; |
| } |
| } |