blob: 9bb977e5149157bb9fd1b1e6caa876a2dfc1fab5 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.impl.networkRestore;
import java.util.ArrayList;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.dbi.EnvironmentFailureReason;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.rep.impl.node.NameIdPair;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.utilint.RepUtils;
import com.sleepycat.je.rep.utilint.ServiceDispatcher;
import com.sleepycat.je.util.DbBackup;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.StoppableThread;
/**
* Manages the multiple log file feeders that may be servicing requests from
* multiple clients requesting log files.
*/
public class FeederManager extends StoppableThread {
/*
* The queue into which the ServiceDispatcher queues socket channels for
* new Feeder instances.
*/
private final BlockingQueue<DataChannel> channelQueue =
new LinkedBlockingQueue<>();
/*
* Map indexed by the client id. Each Feeder adds itself to the Map when
* its first created and removes itself when it exits.
*/
final Map<Integer, LogFileFeeder> feeders =
new ConcurrentHashMap<>();
/*
* Maps the client id to its Lease. Except for instantaneous overlaps,
* a client will have an entry in either the feeders map or the leases
* map, but not in both maps.
*/
final Map<Integer, Lease> leases = new ConcurrentHashMap<>();
/*
* A cache of StatResponses to try minimize the recomputation of SHA1
* hashes.
*/
final Map<String, Protocol.FileInfoResp> statResponses =
new ConcurrentHashMap<>();
/* Implements the timer used to maintain the leases. */
final Timer leaseTimer = new Timer(true);
/* This node's name and internal id */
final NameIdPair nameIdPair;
/* Counts the number of times the lease was renewed. */
public int leaseRenewalCount;
/* The duration of leases. */
long leaseDuration = DEFAULT_LEASE_DURATION;
final ServiceDispatcher serviceDispatcher;
/* Determines whether the feeder manager has been shutdown. */
final AtomicBoolean shutdown = new AtomicBoolean(false);
final Logger logger;
/* Wait indefinitely for somebody to request the service. */
private static long POLL_TIMEOUT = Long.MAX_VALUE;
/* Identifies the Feeder Service. */
public static final String FEEDER_SERVICE = "LogFileFeeder";
/*
* Default duration of lease on DbBackup associated with the client. It's
* five minutes.
*/
private static final long DEFAULT_LEASE_DURATION = 5 * 60 * 1000;
/**
* Creates a FeederManager but does not start it.
*
* @param serviceDispatcher The service dispatcher with which the
* FeederManager must register itself. It's null only in a test
* environment.
*
* @param nameIdPair The node name and id associated with the feeder
*
* @param envImpl the environment that will provide the log files
*/
public FeederManager(ServiceDispatcher serviceDispatcher,
EnvironmentImpl envImpl,
NameIdPair nameIdPair) {
super(envImpl, "Feeder Manager node: " + nameIdPair.getName());
this.serviceDispatcher = serviceDispatcher;
serviceDispatcher.register
(serviceDispatcher.new
LazyQueuingService(FEEDER_SERVICE, channelQueue, this));
this.nameIdPair = nameIdPair;
logger = LoggerUtils.getLogger(getClass());
}
EnvironmentImpl getEnvImpl() {
return envImpl;
}
/**
* Returns the number of times the lease was actually renewed.
*/
public int getLeaseRenewalCount() {
return leaseRenewalCount;
}
/**
* Returns the number of leases that are currently outstanding.
*/
public int getLeaseCount() {
return leases.size();
}
/**
* Returns the number of feeders that are currently active with this node.
* Note that active leases are included in this count, since it's expected
* that the clients will try to reconnect.
*/
public int getActiveFeederCount() {
return feeders.size() + getLeaseCount();
}
public long getLeaseDuration() {
return leaseDuration;
}
public void setLeaseDuration(long leaseDuration) {
this.leaseDuration = leaseDuration;
}
/**
* Clears the cached checksum for a file when it may be overwritten
* (e.g., entries may be erased).
*/
public void clearedCachedFileChecksum(String fileName) {
statResponses.remove(fileName);
}
/**
* The dispatcher method that starts up new log file feeders.
*/
@Override
public void run() {
try {
while (true) {
final DataChannel channel =
channelQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
if (channel == RepUtils.CHANNEL_EOF_MARKER) {
LoggerUtils.info(logger, envImpl,
"Log file Feeder manager soft shutdown.");
return;
}
new LogFileFeeder(this, channel).start();
}
} catch (InterruptedException ie) {
LoggerUtils.info
(logger, envImpl, "Log file feeder manager interrupted");
} catch (Exception e) {
LoggerUtils.severe(logger, envImpl,
"unanticipated exception: " + e.getMessage());
throw new EnvironmentFailureException
(envImpl, EnvironmentFailureReason.UNCAUGHT_EXCEPTION, e);
} finally {
shutdown();
}
}
public void shutdown() {
LoggerUtils.fine
(logger, envImpl, "Shutting down log file feeder manager");
if (!shutdown.compareAndSet(false, true)) {
return;
}
shutdownThread(logger);
/* shutdown active feeder threads */
for (LogFileFeeder feeder :
new ArrayList<>(feeders.values())) {
feeder.shutdown();
}
leaseTimer.cancel();
/*
* Terminate any outstanding leases, and close their associated open
* dbbackups, so that the environment can be closed cleanly.
*/
for (Lease l : new ArrayList<>(leases.values())) {
DbBackup dbBackup = l.terminate();
if (dbBackup.backupIsOpen()) {
dbBackup.endBackup();
}
}
serviceDispatcher.cancel(FEEDER_SERVICE);
cleanup();
LoggerUtils.fine(logger, envImpl,
"Shut down log file feeder manager completed");
}
@Override
protected int initiateSoftShutdown() {
/* Shutdown invoked from a different thread. */
channelQueue.clear();
/* Add special entry so that the channelQueue.poll operation exits. */
channelQueue.add(RepUtils.CHANNEL_EOF_MARKER);
return 0;
}
/**
* Provides the lease mechanism used to maintain a handle to the DbBackup
* object across Server client disconnects.
*/
class Lease extends TimerTask {
private final int id;
private DbBackup dbBackup;
public Lease(int id, long duration, DbBackup dbbackup) {
super();
this.dbBackup = dbbackup;
this.id = id;
Lease oldLease = leases.put(id, this);
if (oldLease != null) {
throw EnvironmentFailureException.unexpectedState
("Found an old lease for node: " + id);
}
leaseTimer.schedule(this, duration);
}
@Override
/* The timer went off, expire the lease if it hasn't been terminated */
public synchronized void run() {
if (dbBackup == null) {
return;
}
dbBackup.endBackup();
terminate();
}
/**
* Fetches the leased DbBackup instance and terminates the lease.
*
* @return the dbBackup instance, if the lease hasn't already been
* terminated
*/
public synchronized DbBackup terminate() {
if (dbBackup == null) {
return null;
}
cancel();
Lease l = leases.remove(id);
assert(l == this);
DbBackup saveDbBackup = dbBackup;
dbBackup = null;
return saveDbBackup;
}
public synchronized DbBackup getOpenDbBackup() {
return (dbBackup != null) && dbBackup.backupIsOpen() ?
dbBackup :
null;
}
}
/**
* @see StoppableThread#getLogger
*/
@Override
protected Logger getLogger() {
return logger;
}
}