blob: 4d0baca3158374253e88ba44db8b530ca7e26a9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.Namespace;
import org.apache.accumulo.core.client.impl.Namespaces;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.ThriftTransportPool;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.master.thrift.MasterClientService.Iface;
import org.apache.accumulo.core.master.thrift.MasterClientService.Processor;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
import org.apache.accumulo.core.master.thrift.MasterState;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.trace.wrappers.TraceWrap;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.fate.AgeOffStore;
import org.apache.accumulo.fate.Fate;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.master.metrics.MasterMetricsFactory;
import org.apache.accumulo.master.recovery.RecoveryManager;
import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
import org.apache.accumulo.master.replication.ReplicationDriver;
import org.apache.accumulo.master.replication.WorkDriver;
import org.apache.accumulo.master.state.TableCounts;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.HighlyAvailableService;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeChooserEnvironment;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
import org.apache.accumulo.server.master.balancer.TabletBalancer;
import org.apache.accumulo.server.master.state.CurrentState;
import org.apache.accumulo.server.master.state.DeadServerList;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeState;
import org.apache.accumulo.server.master.state.MetaDataStateStore;
import org.apache.accumulo.server.master.state.RootTabletStateStore;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.apache.accumulo.server.master.state.TabletServerState;
import org.apache.accumulo.server.master.state.TabletState;
import org.apache.accumulo.server.master.state.ZooStore;
import org.apache.accumulo.server.master.state.ZooTabletStateStore;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.HighlyAvailableServiceWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenKeyManager;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyDistributor;
import org.apache.accumulo.server.security.handler.ZKPermHandler;
import org.apache.accumulo.server.tables.TableManager;
import org.apache.accumulo.server.tables.TableObserver;
import org.apache.accumulo.server.util.DefaultMap;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
* The Master is responsible for assigning and balancing tablets to tablet servers.
* <p>
* The master will also coordinate log recoveries and reports general status.
*/
public class Master
implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService {
static final Logger log = LoggerFactory.getLogger(Master.class);
static final int ONE_SECOND = 1000;
static final long TIME_TO_WAIT_BETWEEN_SCANS = 60 * ONE_SECOND;
private static final long TIME_BETWEEN_MIGRATION_CLEANUPS = 5 * 60 * ONE_SECOND;
static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
private static final int TIME_TO_WAIT_BETWEEN_LOCK_CHECKS = ONE_SECOND;
static final int MAX_TSERVER_WORK_CHUNK = 5000;
private static final int MAX_BAD_STATUS_COUNT = 3;
final VolumeManager fs;
private final String hostname;
private final Object balancedNotifier = new Object();
final LiveTServerSet tserverSet;
private final List<TabletGroupWatcher> watchers = new ArrayList<>();
final SecurityOperation security;
final Map<TServerInstance,AtomicInteger> badServers = Collections
.synchronizedMap(new DefaultMap<>(new AtomicInteger()));
final Set<TServerInstance> serversToShutdown = Collections.synchronizedSet(new HashSet<>());
final SortedMap<KeyExtent,TServerInstance> migrations = Collections
.synchronizedSortedMap(new TreeMap<>());
final EventCoordinator nextEvent = new EventCoordinator();
private final Object mergeLock = new Object();
private ReplicationDriver replicationWorkDriver;
private WorkDriver replicationWorkAssigner;
RecoveryManager recoveryManager = null;
private final MasterTime timeKeeper;
// Delegation Token classes
private final boolean delegationTokensAvailable;
private ZooAuthenticationKeyDistributor keyDistributor;
private AuthenticationTokenKeyManager authenticationTokenKeyManager;
ZooLock masterLock = null;
private TServer clientService = null;
TabletBalancer tabletBalancer;
private MasterState state = MasterState.INITIAL;
Fate<Master> fate;
volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
.unmodifiableSortedMap(new TreeMap<>());
final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
private final AtomicBoolean masterInitialized = new AtomicBoolean(false);
@Override
public synchronized MasterState getMasterState() {
return state;
}
public boolean stillMaster() {
return getMasterState() != MasterState.STOP;
}
static final boolean X = true;
static final boolean O = false;
// @formatter:off
static final boolean transitionOK[][] = {
// INITIAL HAVE_LOCK SAFE_MODE NORMAL UNLOAD_META UNLOAD_ROOT STOP
/* INITIAL */ {X, X, O, O, O, O, X},
/* HAVE_LOCK */ {O, X, X, X, O, O, X},
/* SAFE_MODE */ {O, O, X, X, X, O, X},
/* NORMAL */ {O, O, X, X, X, O, X},
/* UNLOAD_METADATA_TABLETS */ {O, O, X, X, X, X, X},
/* UNLOAD_ROOT_TABLET */ {O, O, O, X, X, X, X},
/* STOP */ {O, O, O, O, O, X, X}};
//@formatter:on
synchronized void setMasterState(MasterState newState) {
if (state.equals(newState))
return;
if (!transitionOK[state.ordinal()][newState.ordinal()]) {
log.error("Programmer error: master should not transition from {} to {}", state, newState);
}
MasterState oldState = state;
state = newState;
nextEvent.event("State changed from %s to %s", oldState, newState);
if (newState == MasterState.STOP) {
// Give the server a little time before shutdown so the client
// thread requesting the stop can return
SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() {
@Override
public void run() {
// This frees the main thread and will cause the master to exit
clientService.stop();
Master.this.nextEvent.event("stopped event loop");
}
}, 100L, 1000L);
}
if (oldState != newState && (newState == MasterState.HAVE_LOCK)) {
upgradeZookeeper();
}
if (oldState != newState && (newState == MasterState.NORMAL)) {
upgradeMetadata();
}
}
private void moveRootTabletToRootTable(IZooReaderWriter zoo) throws Exception {
String dirZPath = getZooKeeperRoot() + RootTable.ZROOT_TABLET_PATH;
if (!zoo.exists(dirZPath)) {
Path oldPath = fs.getFullPath(FileType.TABLE, "/" + MetadataTable.ID + "/root_tablet");
if (fs.exists(oldPath)) {
VolumeChooserEnvironment chooserEnv = new VolumeChooserEnvironment(RootTable.ID, context);
String newPath = fs.choose(chooserEnv, ServerConstants.getBaseUris(getConfiguration()))
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + RootTable.ID;
fs.mkdirs(new Path(newPath));
if (!fs.rename(oldPath, new Path(newPath))) {
throw new IOException("Failed to move root tablet from " + oldPath + " to " + newPath);
}
log.info("Upgrade renamed {} to {}", oldPath, newPath);
}
Path location = null;
for (String basePath : ServerConstants.getTablesDirs(getConfiguration())) {
Path path = new Path(basePath + "/" + RootTable.ID + RootTable.ROOT_TABLET_LOCATION);
if (fs.exists(path)) {
if (location != null) {
throw new IllegalStateException(
"Root table at multiple locations " + location + " " + path);
}
location = path;
}
}
if (location == null)
throw new IllegalStateException("Failed to find root tablet");
log.info("Upgrade setting root table location in zookeeper {}", location);
zoo.putPersistentData(dirZPath, location.toString().getBytes(), NodeExistsPolicy.FAIL);
}
}
private boolean haveUpgradedZooKeeper = false;
@SuppressFBWarnings(value = "DM_EXIT",
justification = "TODO probably not the best to call System.exit here")
private void upgradeZookeeper() {
// 1.5.1 and 1.6.0 both do some state checking after obtaining the zoolock for the
// monitor and before starting up. It's not tied to the data version at all (and would
// introduce unnecessary complexity to try to make the master do it), but be aware
// that the master is not the only thing that may alter zookeeper before starting.
final int accumuloPersistentVersion = Accumulo.getAccumuloPersistentVersion(fs);
if (Accumulo.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
// This Master hasn't started Fate yet, so any outstanding transactions must be from before
// the upgrade.
// Change to Guava's Verify once we use Guava 17.
if (null != fate) {
throw new IllegalStateException("Access to Fate should not have been"
+ " initialized prior to the Master transitioning to active. Please"
+ " save all logs and file a bug.");
}
Accumulo.abortIfFateTransactions(getContext());
try {
log.info("Upgrading zookeeper");
IZooReaderWriter zoo = context.getZooReaderWriter();
final String zooRoot = getZooKeeperRoot();
log.debug("Handling updates for version {}", accumuloPersistentVersion);
log.debug("Cleaning out remnants of logger role.");
zoo.recursiveDelete(zooRoot + "/loggers", NodeMissingPolicy.SKIP);
zoo.recursiveDelete(zooRoot + "/dead/loggers", NodeMissingPolicy.SKIP);
final byte[] zero = {'0'};
log.debug("Initializing recovery area.");
zoo.putPersistentData(zooRoot + Constants.ZRECOVERY, zero, NodeExistsPolicy.SKIP);
for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
log.debug("Prepping table {} for compaction cancellations.", id);
zoo.putPersistentData(
zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_COMPACT_CANCEL_ID, zero,
NodeExistsPolicy.SKIP);
}
@SuppressWarnings("deprecation")
String zpath = zooRoot + Constants.ZCONFIG + "/" + Property.TSERV_WAL_SYNC_METHOD.getKey();
// is the entire instance set to use flushing vs sync?
boolean flushDefault = false;
try {
byte data[] = zoo.getData(zpath, null);
if (new String(data, UTF_8).endsWith("flush")) {
flushDefault = true;
}
} catch (KeeperException.NoNodeException ex) {
// skip
}
for (String id : zoo.getChildren(zooRoot + Constants.ZTABLES)) {
log.debug("Converting table {} WALog setting to Durability", id);
try {
@SuppressWarnings("deprecation")
String path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/"
+ Property.TABLE_WALOG_ENABLED.getKey();
byte[] data = zoo.getData(path, null);
boolean useWAL = Boolean.parseBoolean(new String(data, UTF_8));
zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
path = zooRoot + Constants.ZTABLES + "/" + id + Constants.ZTABLE_CONF + "/"
+ Property.TABLE_DURABILITY.getKey();
if (useWAL) {
if (flushDefault) {
zoo.putPersistentData(path, "flush".getBytes(), NodeExistsPolicy.SKIP);
} else {
zoo.putPersistentData(path, "sync".getBytes(), NodeExistsPolicy.SKIP);
}
} else {
zoo.putPersistentData(path, "none".getBytes(), NodeExistsPolicy.SKIP);
}
} catch (KeeperException.NoNodeException ex) {
// skip it
}
}
// create initial namespaces
String namespaces = getZooKeeperRoot() + Constants.ZNAMESPACES;
zoo.putPersistentData(namespaces, new byte[0], NodeExistsPolicy.SKIP);
for (Pair<String,Namespace.ID> namespace : Iterables.concat(
Collections.singleton(new Pair<>(Namespace.ACCUMULO, Namespace.ID.ACCUMULO)),
Collections.singleton(new Pair<>(Namespace.DEFAULT, Namespace.ID.DEFAULT)))) {
String ns = namespace.getFirst();
Namespace.ID id = namespace.getSecond();
log.debug("Upgrade creating namespace \"{}\" (ID: {})", ns, id);
if (!Namespaces.exists(context, id))
TableManager.prepareNewNamespaceState(zoo, getInstanceID(), id, ns,
NodeExistsPolicy.SKIP);
}
// create replication table in zk
log.debug("Upgrade creating table {} (ID: {})", ReplicationTable.NAME, ReplicationTable.ID);
TableManager.prepareNewTableState(zoo, getInstanceID(), ReplicationTable.ID,
Namespace.ID.ACCUMULO, ReplicationTable.NAME, TableState.OFFLINE,
NodeExistsPolicy.SKIP);
// create root table
log.debug("Upgrade creating table {} (ID: {})", RootTable.NAME, RootTable.ID);
TableManager.prepareNewTableState(zoo, getInstanceID(), RootTable.ID, Namespace.ID.ACCUMULO,
RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.SKIP);
Initialize.initSystemTablesConfig(context.getZooReaderWriter(), context.getZooKeeperRoot());
// ensure root user can flush root table
security.grantTablePermission(context.rpcCreds(), security.getRootUsername(), RootTable.ID,
TablePermission.ALTER_TABLE, Namespace.ID.ACCUMULO);
// put existing tables in the correct namespaces
String tables = getZooKeeperRoot() + Constants.ZTABLES;
for (String tableId : zoo.getChildren(tables)) {
Namespace.ID targetNamespace = (MetadataTable.ID.canonicalID().equals(tableId)
|| RootTable.ID.canonicalID().equals(tableId)) ? Namespace.ID.ACCUMULO
: Namespace.ID.DEFAULT;
log.debug("Upgrade moving table {} (ID: {}) into namespace with ID {}",
new String(zoo.getData(tables + "/" + tableId + Constants.ZTABLE_NAME, null), UTF_8),
tableId, targetNamespace);
zoo.putPersistentData(tables + "/" + tableId + Constants.ZTABLE_NAMESPACE,
targetNamespace.getUtf8(), NodeExistsPolicy.SKIP);
}
// rename metadata table
log.debug("Upgrade renaming table {} (ID: {}) to {}", MetadataTable.OLD_NAME,
MetadataTable.ID, MetadataTable.NAME);
zoo.putPersistentData(tables + "/" + MetadataTable.ID + Constants.ZTABLE_NAME,
Tables.qualify(MetadataTable.NAME).getSecond().getBytes(UTF_8),
NodeExistsPolicy.OVERWRITE);
moveRootTabletToRootTable(zoo);
// add system namespace permissions to existing users
ZKPermHandler perm = new ZKPermHandler();
perm.initialize(getContext(), true);
String users = getZooKeeperRoot() + "/users";
for (String user : zoo.getChildren(users)) {
zoo.putPersistentData(users + "/" + user + "/Namespaces", new byte[0],
NodeExistsPolicy.SKIP);
perm.grantNamespacePermission(user, Namespace.ID.ACCUMULO, NamespacePermission.READ);
}
perm.grantNamespacePermission("root", Namespace.ID.ACCUMULO,
NamespacePermission.ALTER_TABLE);
// add the currlog location for root tablet current logs
zoo.putPersistentData(getZooKeeperRoot() + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0],
NodeExistsPolicy.SKIP);
// create tablet server wal logs node in ZK
zoo.putPersistentData(getZooKeeperRoot() + WalStateManager.ZWALS, new byte[0],
NodeExistsPolicy.SKIP);
haveUpgradedZooKeeper = true;
} catch (Exception ex) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
log.error("FATAL: Error performing upgrade", ex);
System.exit(1);
}
}
}
private final AtomicBoolean upgradeMetadataRunning = new AtomicBoolean(false);
private final CountDownLatch waitForMetadataUpgrade = new CountDownLatch(1);
private final ServerContext context;
private final ServerConfigurationFactory serverConfig;
private MasterClientServiceHandler clientHandler;
private void upgradeMetadata() {
// we make sure we're only doing the rest of this method once so that we can signal to other
// threads that an upgrade wasn't needed.
if (upgradeMetadataRunning.compareAndSet(false, true)) {
final int accumuloPersistentVersion = Accumulo.getAccumuloPersistentVersion(fs);
if (Accumulo.persistentVersionNeedsUpgrade(accumuloPersistentVersion)) {
// sanity check that we passed the Fate verification prior to ZooKeeper upgrade, and that
// Fate still hasn't been started.
// Change both to use Guava's Verify once we use Guava 17.
if (!haveUpgradedZooKeeper) {
throw new IllegalStateException("We should only attempt to upgrade"
+ " Accumulo's metadata table if we've already upgraded ZooKeeper."
+ " Please save all logs and file a bug.");
}
if (null != fate) {
throw new IllegalStateException("Access to Fate should not have been"
+ " initialized prior to the Master finishing upgrades. Please save"
+ " all logs and file a bug.");
}
Runnable upgradeTask = new Runnable() {
int version = accumuloPersistentVersion;
@SuppressFBWarnings(value = "DM_EXIT",
justification = "TODO probably not the best to call System.exit here")
@Override
public void run() {
try {
log.info("Starting to upgrade metadata table.");
if (version == ServerConstants.MOVE_DELETE_MARKERS - 1) {
log.info("Updating Delete Markers in metadata table for version 1.4");
MetadataTableUtil.moveMetaDeleteMarkersFrom14(context);
version++;
}
if (version == ServerConstants.MOVE_TO_ROOT_TABLE - 1) {
log.info("Updating Delete Markers in metadata table.");
MetadataTableUtil.moveMetaDeleteMarkers(context);
version++;
}
if (version == ServerConstants.MOVE_TO_REPLICATION_TABLE - 1) {
log.info("Updating metadata table with entries for the replication table");
MetadataTableUtil.createReplicationTable(context);
version++;
}
log.info("Updating persistent data version.");
Accumulo.updateAccumuloVersion(fs, accumuloPersistentVersion);
log.info("Upgrade complete");
waitForMetadataUpgrade.countDown();
} catch (Exception ex) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j
// compatibility
log.error("FATAL: Error performing upgrade", ex);
System.exit(1);
}
}
};
// need to run this in a separate thread because a lock is held that prevents metadata
// tablets from being assigned and this task writes to the
// metadata table
new Thread(upgradeTask).start();
} else {
waitForMetadataUpgrade.countDown();
}
}
}
private int assignedOrHosted(Table.ID tableId) {
int result = 0;
for (TabletGroupWatcher watcher : watchers) {
TableCounts count = watcher.getStats(tableId);
result += count.hosted() + count.assigned();
}
return result;
}
private int totalAssignedOrHosted() {
int result = 0;
for (TabletGroupWatcher watcher : watchers) {
for (TableCounts counts : watcher.getStats().values()) {
result += counts.assigned() + counts.hosted();
}
}
return result;
}
private int nonMetaDataTabletsAssignedOrHosted() {
return totalAssignedOrHosted() - assignedOrHosted(MetadataTable.ID)
- assignedOrHosted(RootTable.ID);
}
private int notHosted() {
int result = 0;
for (TabletGroupWatcher watcher : watchers) {
for (TableCounts counts : watcher.getStats().values()) {
result += counts.assigned() + counts.assignedToDeadServers() + counts.suspended();
}
}
return result;
}
// The number of unassigned tablets that should be assigned: displayed on the monitor page
int displayUnassigned() {
int result = 0;
switch (getMasterState()) {
case NORMAL:
// Count offline tablets for online tables
for (TabletGroupWatcher watcher : watchers) {
TableManager manager = context.getTableManager();
for (Entry<Table.ID,TableCounts> entry : watcher.getStats().entrySet()) {
Table.ID tableId = entry.getKey();
TableCounts counts = entry.getValue();
TableState tableState = manager.getTableState(tableId);
if (tableState != null && tableState.equals(TableState.ONLINE)) {
result += counts.unassigned() + counts.assignedToDeadServers() + counts.assigned()
+ counts.suspended();
}
}
}
break;
case SAFE_MODE:
// Count offline tablets for the metadata table
for (TabletGroupWatcher watcher : watchers) {
TableCounts counts = watcher.getStats(MetadataTable.ID);
result += counts.unassigned() + counts.suspended();
}
break;
case UNLOAD_METADATA_TABLETS:
case UNLOAD_ROOT_TABLET:
for (TabletGroupWatcher watcher : watchers) {
TableCounts counts = watcher.getStats(MetadataTable.ID);
result += counts.unassigned() + counts.suspended();
}
break;
default:
break;
}
return result;
}
public void mustBeOnline(final Table.ID tableId) throws ThriftTableOperationException {
Tables.clearCache(context);
if (!Tables.getTableState(context, tableId).equals(TableState.ONLINE))
throw new ThriftTableOperationException(tableId.canonicalID(), null, TableOperation.MERGE,
TableOperationExceptionType.OFFLINE, "table is not online");
}
public ServerContext getContext() {
return context;
}
public TableManager getTableManager() {
return context.getTableManager();
}
public AccumuloClient getClient() throws AccumuloSecurityException, AccumuloException {
return context.getClient();
}
public Master(ServerContext context) throws IOException {
this.context = context;
this.serverConfig = context.getServerConfFactory();
this.fs = context.getVolumeManager();
this.hostname = context.getHostname();
AccumuloConfiguration aconf = serverConfig.getSystemConfiguration();
log.info("Version {}", Constants.VERSION);
log.info("Instance {}", getInstanceID());
timeKeeper = new MasterTime(this);
ThriftTransportPool.getInstance()
.setIdleTime(aconf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT));
tserverSet = new LiveTServerSet(context, this);
this.tabletBalancer = Property.createInstanceFromPropertyName(aconf,
Property.MASTER_TABLET_BALANCER, TabletBalancer.class, new DefaultLoadBalancer());
this.tabletBalancer.init(context);
try {
AccumuloVFSClassLoader.getContextManager()
.setContextConfig(new ContextManager.DefaultContextsConfig() {
@Override
public Map<String,String> getVfsContextClasspathProperties() {
return getConfiguration()
.getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
this.security = AuditedSecurityOperation.getInstance(context);
// Create the secret manager (can generate and verify delegation tokens)
final long tokenLifetime = aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
context.setSecretManager(new AuthenticationTokenSecretManager(getInstanceID(), tokenLifetime));
authenticationTokenKeyManager = null;
keyDistributor = null;
if (getConfiguration().getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
// SASL is enabled, create the key distributor (ZooKeeper) and manager (generates/rolls secret
// keys)
log.info("SASL is enabled, creating delegation token key manager and distributor");
final long tokenUpdateInterval = aconf
.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_UPDATE_INTERVAL);
keyDistributor = new ZooAuthenticationKeyDistributor(context.getZooReaderWriter(),
getZooKeeperRoot() + Constants.ZDELEGATION_TOKEN_KEYS);
authenticationTokenKeyManager = new AuthenticationTokenKeyManager(context.getSecretManager(),
keyDistributor, tokenUpdateInterval, tokenLifetime);
delegationTokensAvailable = true;
} else {
log.info("SASL is not enabled, delegation tokens will not be available");
delegationTokensAvailable = false;
}
}
public String getInstanceID() {
return context.getInstanceID();
}
public String getZooKeeperRoot() {
return context.getZooKeeperRoot();
}
public AccumuloConfiguration getConfiguration() {
return context.getConfiguration();
}
public TServerConnection getConnection(TServerInstance server) {
return tserverSet.getConnection(server);
}
public MergeInfo getMergeInfo(Table.ID tableId) {
synchronized (mergeLock) {
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
if (!context.getZooReaderWriter().exists(path))
return new MergeInfo();
byte[] data = context.getZooReaderWriter().getData(path, new Stat());
DataInputBuffer in = new DataInputBuffer();
in.reset(data, data.length);
MergeInfo info = new MergeInfo();
info.readFields(in);
return info;
} catch (KeeperException.NoNodeException ex) {
log.info("Error reading merge state, it probably just finished");
return new MergeInfo();
} catch (Exception ex) {
log.warn("Unexpected error reading merge state", ex);
return new MergeInfo();
}
}
}
public void setMergeState(MergeInfo info, MergeState state)
throws IOException, KeeperException, InterruptedException {
synchronized (mergeLock) {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + info.getExtent().getTableId()
+ "/merge";
info.setState(state);
if (state.equals(MergeState.NONE)) {
context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
} else {
DataOutputBuffer out = new DataOutputBuffer();
try {
info.write(out);
} catch (IOException ex) {
throw new RuntimeException("Unlikely", ex);
}
context.getZooReaderWriter().putPersistentData(path, out.getData(),
state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
: ZooUtil.NodeExistsPolicy.OVERWRITE);
}
mergeLock.notifyAll();
}
nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
}
public void clearMergeState(Table.ID tableId)
throws IOException, KeeperException, InterruptedException {
synchronized (mergeLock) {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + "/merge";
context.getZooReaderWriter().recursiveDelete(path, NodeMissingPolicy.SKIP);
mergeLock.notifyAll();
}
nextEvent.event("Merge state of %s cleared", tableId);
}
void setMasterGoalState(MasterGoalState state) {
try {
context.getZooReaderWriter().putPersistentData(
getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, state.name().getBytes(),
NodeExistsPolicy.OVERWRITE);
} catch (Exception ex) {
log.error("Unable to set master goal state in zookeeper");
}
}
MasterGoalState getMasterGoalState() {
while (true)
try {
byte[] data = context.getZooReaderWriter()
.getData(getZooKeeperRoot() + Constants.ZMASTER_GOAL_STATE, null);
return MasterGoalState.valueOf(new String(data));
} catch (Exception e) {
log.error("Problem getting real goal state from zookeeper: ", e);
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
public boolean hasCycled(long time) {
for (TabletGroupWatcher watcher : watchers) {
if (watcher.stats.lastScanFinished() < time)
return false;
}
return true;
}
public void clearMigrations(Table.ID tableId) {
synchronized (migrations) {
Iterator<KeyExtent> iterator = migrations.keySet().iterator();
while (iterator.hasNext()) {
KeyExtent extent = iterator.next();
if (extent.getTableId().equals(tableId)) {
iterator.remove();
}
}
}
}
static enum TabletGoalState {
HOSTED(TUnloadTabletGoal.UNKNOWN),
UNASSIGNED(TUnloadTabletGoal.UNASSIGNED),
DELETED(TUnloadTabletGoal.DELETED),
SUSPENDED(TUnloadTabletGoal.SUSPENDED);
private final TUnloadTabletGoal unloadGoal;
TabletGoalState(TUnloadTabletGoal unloadGoal) {
this.unloadGoal = unloadGoal;
}
/** The purpose of unloading this tablet. */
public TUnloadTabletGoal howUnload() {
return unloadGoal;
}
}
TabletGoalState getSystemGoalState(TabletLocationState tls) {
switch (getMasterState()) {
case NORMAL:
return TabletGoalState.HOSTED;
case HAVE_LOCK: // fall-through intended
case INITIAL: // fall-through intended
case SAFE_MODE:
if (tls.extent.isMeta())
return TabletGoalState.HOSTED;
return TabletGoalState.UNASSIGNED;
case UNLOAD_METADATA_TABLETS:
if (tls.extent.isRootTablet())
return TabletGoalState.HOSTED;
return TabletGoalState.UNASSIGNED;
case UNLOAD_ROOT_TABLET:
return TabletGoalState.UNASSIGNED;
case STOP:
return TabletGoalState.UNASSIGNED;
default:
throw new IllegalStateException("Unknown Master State");
}
}
TabletGoalState getTableGoalState(KeyExtent extent) {
TableState tableState = context.getTableManager().getTableState(extent.getTableId());
if (tableState == null)
return TabletGoalState.DELETED;
switch (tableState) {
case DELETING:
return TabletGoalState.DELETED;
case OFFLINE:
case NEW:
return TabletGoalState.UNASSIGNED;
default:
return TabletGoalState.HOSTED;
}
}
TabletGoalState getGoalState(TabletLocationState tls, MergeInfo mergeInfo) {
KeyExtent extent = tls.extent;
// Shutting down?
TabletGoalState state = getSystemGoalState(tls);
if (state == TabletGoalState.HOSTED) {
if (tls.current != null && serversToShutdown.contains(tls.current)) {
return TabletGoalState.SUSPENDED;
}
// Handle merge transitions
if (mergeInfo.getExtent() != null) {
log.debug("mergeInfo overlaps: {} {}", extent, mergeInfo.overlaps(extent));
if (mergeInfo.overlaps(extent)) {
switch (mergeInfo.getState()) {
case NONE:
case COMPLETE:
break;
case STARTED:
case SPLITTING:
return TabletGoalState.HOSTED;
case WAITING_FOR_CHOPPED:
if (tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) {
if (tls.chopped)
return TabletGoalState.UNASSIGNED;
} else {
if (tls.chopped && tls.walogs.isEmpty())
return TabletGoalState.UNASSIGNED;
}
return TabletGoalState.HOSTED;
case WAITING_FOR_OFFLINE:
case MERGING:
return TabletGoalState.UNASSIGNED;
}
}
}
// taking table offline?
state = getTableGoalState(extent);
if (state == TabletGoalState.HOSTED) {
// Maybe this tablet needs to be migrated
TServerInstance dest = migrations.get(extent);
if (dest != null && tls.current != null && !dest.equals(tls.current)) {
return TabletGoalState.UNASSIGNED;
}
}
}
return state;
}
private class MigrationCleanupThread extends Daemon {
@Override
public void run() {
setName("Migration Cleanup Thread");
while (stillMaster()) {
if (!migrations.isEmpty()) {
try {
cleanupOfflineMigrations();
cleanupNonexistentMigrations(context.getClient());
} catch (Exception ex) {
log.error("Error cleaning up migrations", ex);
}
}
sleepUninterruptibly(TIME_BETWEEN_MIGRATION_CLEANUPS, TimeUnit.MILLISECONDS);
}
}
/**
* If a migrating tablet splits, and the tablet dies before sending the master a message, the
* migration will refer to a non-existing tablet, so it can never complete. Periodically scan
* the metadata table and remove any migrating tablets that no longer exist.
*/
private void cleanupNonexistentMigrations(final AccumuloClient accumuloClient)
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Scanner scanner = accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner);
Set<KeyExtent> found = new HashSet<>();
for (Entry<Key,Value> entry : scanner) {
KeyExtent extent = new KeyExtent(entry.getKey().getRow(), entry.getValue());
if (migrations.containsKey(extent)) {
found.add(extent);
}
}
migrations.keySet().retainAll(found);
}
/**
* If migrating a tablet for a table that is offline, the migration can never succeed because no
* tablet server will load the tablet. check for offline tables and remove their migrations.
*/
private void cleanupOfflineMigrations() {
TableManager manager = context.getTableManager();
for (Table.ID tableId : Tables.getIdToNameMap(context).keySet()) {
TableState state = manager.getTableState(tableId);
if (TableState.OFFLINE == state) {
clearMigrations(tableId);
}
}
}
}
private class StatusThread extends Daemon {
private boolean goodStats() {
int start;
switch (getMasterState()) {
case UNLOAD_METADATA_TABLETS:
start = 1;
break;
case UNLOAD_ROOT_TABLET:
start = 2;
break;
default:
start = 0;
}
for (int i = start; i < watchers.size(); i++) {
TabletGroupWatcher watcher = watchers.get(i);
if (watcher.stats.getLastMasterState() != getMasterState()) {
log.debug("{}: {} != {}", watcher.getName(), watcher.stats.getLastMasterState(),
getMasterState());
return false;
}
}
return true;
}
@Override
public void run() {
setName("Status Thread");
EventCoordinator.Listener eventListener = nextEvent.getListener();
while (stillMaster()) {
long wait = DEFAULT_WAIT_FOR_WATCHER;
try {
switch (getMasterGoalState()) {
case NORMAL:
setMasterState(MasterState.NORMAL);
break;
case SAFE_MODE:
if (getMasterState() == MasterState.NORMAL) {
setMasterState(MasterState.SAFE_MODE);
}
if (getMasterState() == MasterState.HAVE_LOCK) {
setMasterState(MasterState.SAFE_MODE);
}
break;
case CLEAN_STOP:
switch (getMasterState()) {
case NORMAL:
setMasterState(MasterState.SAFE_MODE);
break;
case SAFE_MODE: {
int count = nonMetaDataTabletsAssignedOrHosted();
log.debug(
String.format("There are %d non-metadata tablets assigned or hosted", count));
if (count == 0 && goodStats())
setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
}
break;
case UNLOAD_METADATA_TABLETS: {
int count = assignedOrHosted(MetadataTable.ID);
log.debug(
String.format("There are %d metadata tablets assigned or hosted", count));
if (count == 0 && goodStats())
setMasterState(MasterState.UNLOAD_ROOT_TABLET);
}
break;
case UNLOAD_ROOT_TABLET: {
int count = assignedOrHosted(MetadataTable.ID);
if (count > 0 && goodStats()) {
log.debug(String.format("%d metadata tablets online", count));
setMasterState(MasterState.UNLOAD_ROOT_TABLET);
}
int root_count = assignedOrHosted(RootTable.ID);
if (root_count > 0 && goodStats())
log.debug("The root tablet is still assigned or hosted");
if (count + root_count == 0 && goodStats()) {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
log.debug("stopping {} tablet servers", currentServers.size());
for (TServerInstance server : currentServers) {
try {
serversToShutdown.add(server);
tserverSet.getConnection(server).fastHalt(masterLock);
} catch (TException e) {
// its probably down, and we don't care
} finally {
tserverSet.remove(server);
}
}
if (currentServers.size() == 0)
setMasterState(MasterState.STOP);
}
}
break;
default:
break;
}
}
} catch (Throwable t) {
log.error("Error occurred reading / switching master goal state. Will"
+ " continue with attempt to update status", t);
}
try {
wait = updateStatus();
eventListener.waitForEvents(wait);
} catch (Throwable t) {
log.error("Error balancing tablets, will wait for {} (seconds) and then retry ",
WAIT_BETWEEN_ERRORS / ONE_SECOND, t);
sleepUninterruptibly(WAIT_BETWEEN_ERRORS, TimeUnit.MILLISECONDS);
}
}
}
private long updateStatus()
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
tserverStatus = gatherTableInformation(currentServers);
checkForHeldServer(tserverStatus);
if (!badServers.isEmpty()) {
log.debug("not balancing because the balance information is out-of-date {}",
badServers.keySet());
} else if (notHosted() > 0) {
log.debug("not balancing because there are unhosted tablets: {}", notHosted());
} else if (getMasterGoalState() == MasterGoalState.CLEAN_STOP) {
log.debug("not balancing because the master is attempting to stop cleanly");
} else if (!serversToShutdown.isEmpty()) {
log.debug("not balancing while shutting down servers {}", serversToShutdown);
} else {
for (TabletGroupWatcher tgw : watchers) {
if (!tgw.isSameTserversAsLastScan(currentServers)) {
log.debug("not balancing just yet, as collection of live tservers is in flux");
return DEFAULT_WAIT_FOR_WATCHER;
}
}
return balanceTablets();
}
return DEFAULT_WAIT_FOR_WATCHER;
}
private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> tserverStatus) {
TServerInstance instance = null;
int crazyHoldTime = 0;
int someHoldTime = 0;
final long maxWait = getConfiguration().getTimeInMillis(Property.TSERV_HOLD_TIME_SUICIDE);
for (Entry<TServerInstance,TabletServerStatus> entry : tserverStatus.entrySet()) {
if (entry.getValue().getHoldTime() > 0) {
someHoldTime++;
if (entry.getValue().getHoldTime() > maxWait) {
instance = entry.getKey();
crazyHoldTime++;
}
}
}
if (crazyHoldTime == 1 && someHoldTime == 1 && tserverStatus.size() > 1) {
log.warn("Tablet server {} exceeded maximum hold time: attempting to kill it", instance);
try {
TServerConnection connection = tserverSet.getConnection(instance);
if (connection != null)
connection.fastHalt(masterLock);
} catch (TException e) {
log.error("{}", e.getMessage(), e);
}
tserverSet.remove(instance);
}
}
private long balanceTablets() {
List<TabletMigration> migrationsOut = new ArrayList<>();
long wait = tabletBalancer.balance(Collections.unmodifiableSortedMap(tserverStatus),
migrationsSnapshot(), migrationsOut);
for (TabletMigration m : TabletBalancer.checkMigrationSanity(tserverStatus.keySet(),
migrationsOut)) {
if (migrations.containsKey(m.tablet)) {
log.warn("balancer requested migration more than once, skipping {}", m);
continue;
}
migrations.put(m.tablet, m.newServer);
log.debug("migration {}", m);
}
if (migrationsOut.size() > 0) {
nextEvent.event("Migrating %d more tablets, %d total", migrationsOut.size(),
migrations.size());
} else {
synchronized (balancedNotifier) {
balancedNotifier.notifyAll();
}
}
return wait;
}
}
private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
Set<TServerInstance> currentServers) {
final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
int threads = getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE);
ExecutorService tp = threads == 0 ? Executors.newCachedThreadPool()
: Executors.newFixedThreadPool(threads);
long start = System.currentTimeMillis();
final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>();
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
if (threads == 0) {
// Since an unbounded thread pool is being used, rate limit how fast task are added to the
// executor. This prevents the threads from growing large unless there are lots of
// unresponsive tservers.
sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), TimeUnit.MILLISECONDS);
}
tp.submit(new Runnable() {
@Override
public void run() {
try {
Thread t = Thread.currentThread();
String oldName = t.getName();
try {
t.setName("Getting status from " + server);
TServerConnection connection = tserverSet.getConnection(server);
if (connection == null)
throw new IOException("No connection to " + server);
TabletServerStatus status = connection.getTableMap(false);
result.put(server, status);
} finally {
t.setName(oldName);
}
} catch (Exception ex) {
log.error("unable to get tablet server status {} {}", server, ex.toString());
log.debug("unable to get tablet server status {}", server, ex);
if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
log.warn("attempting to stop {}", server);
try {
TServerConnection connection = tserverSet.getConnection(server);
if (connection != null) {
connection.halt(masterLock);
}
} catch (TTransportException e) {
// ignore: it's probably down
} catch (Exception e) {
log.info("error talking to troublesome tablet server", e);
}
badServers.remove(server);
}
}
}
});
}
tp.shutdown();
try {
tp.awaitTermination(Math.max(10000, rpcTimeout / 3), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.debug("Interrupted while fetching status");
}
tp.shutdownNow();
// Threads may still modify map after shutdownNow is called, so create an immutable snapshot.
SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result);
synchronized (badServers) {
badServers.keySet().retainAll(currentServers);
badServers.keySet().removeAll(info.keySet());
}
log.debug(String.format("Finished gathering information from %d of %d servers in %.2f seconds",
info.size(), currentServers.size(), (System.currentTimeMillis() - start) / 1000.));
return info;
}
public void run() throws IOException, InterruptedException, KeeperException {
final String zroot = getZooKeeperRoot();
// ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
// when a hot-standby
//
// Start the Master's Client service
clientHandler = new MasterClientServiceHandler(this);
// Ensure that calls before the master gets the lock fail
Iface haProxy = HighlyAvailableServiceWrapper.service(clientHandler, this);
Iface rpcProxy = TraceWrap.service(haProxy);
final Processor<Iface> processor;
if (ThriftServerType.SASL == context.getThriftServerType()) {
Iface tcredsProxy = TCredentialsUpdatingWrapper.service(rpcProxy, clientHandler.getClass(),
getConfiguration());
processor = new Processor<>(tcredsProxy);
} else {
processor = new Processor<>(rpcProxy);
}
ServerAddress sa = TServerUtils.startServer(context, hostname, Property.MASTER_CLIENTPORT,
processor, "Master", "Master Client Service Handler", null, Property.MASTER_MINTHREADS,
Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
clientService = sa.server;
log.info("Started Master client service at {}", sa.address);
// Start the replication coordinator which assigns tservers to service replication requests
MasterReplicationCoordinator impl = new MasterReplicationCoordinator(this);
ReplicationCoordinator.Iface haReplicationProxy = HighlyAvailableServiceWrapper.service(impl,
this);
// @formatter:off
ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor =
new ReplicationCoordinator.Processor<>(TraceWrap.service(haReplicationProxy));
// @formatter:on
ServerAddress replAddress = TServerUtils.startServer(context, hostname,
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
"Master Replication Coordinator", "Replication Coordinator", null,
Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
log.info("Started replication coordinator service at " + replAddress.address);
// block until we can obtain the ZK lock for the master
getMasterLock(zroot + Constants.ZMASTER_LOCK);
recoveryManager = new RecoveryManager(this);
context.getTableManager().addObserver(this);
StatusThread statusThread = new StatusThread();
statusThread.start();
MigrationCleanupThread migrationCleanupThread = new MigrationCleanupThread();
migrationCleanupThread.start();
tserverSet.startListeningForTabletServerChanges();
ZooReaderWriter zReaderWriter = context.getZooReaderWriter();
zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, new Watcher() {
@Override
public void process(WatchedEvent event) {
nextEvent.event("Noticed recovery changes", event.getType());
try {
// watcher only fires once, add it back
zReaderWriter.getChildren(zroot + Constants.ZRECOVERY, this);
} catch (Exception e) {
log.error("Failed to add log recovery watcher back", e);
}
}
});
watchers.add(new TabletGroupWatcher(this, new MetaDataStateStore(context, this), null) {
@Override
boolean canSuspendTablets() {
// Always allow user data tablets to enter suspended state.
return true;
}
});
watchers.add(
new TabletGroupWatcher(this, new RootTabletStateStore(context, this), watchers.get(0)) {
@Override
boolean canSuspendTablets() {
// Allow metadata tablets to enter suspended state only if so configured. Generally
// we'll want metadata tablets to
// be immediately reassigned, even if there's a global table.suspension.duration
// setting.
return getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE);
}
});
watchers.add(new TabletGroupWatcher(this, new ZooTabletStateStore(new ZooStore(context)),
watchers.get(1)) {
@Override
boolean canSuspendTablets() {
// Never allow root tablet to enter suspended state.
return false;
}
});
for (TabletGroupWatcher watcher : watchers) {
watcher.start();
}
// Once we are sure the upgrade is complete, we can safely allow fate use.
waitForMetadataUpgrade.await();
try {
final AgeOffStore<Master> store = new AgeOffStore<>(new org.apache.accumulo.fate.ZooStore<>(
getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter()), 1000 * 60 * 60 * 8);
int threads = getConfiguration().getCount(Property.MASTER_FATE_THREADPOOL_SIZE);
fate = new Fate<>(this, store);
fate.startTransactionRunners(threads);
SimpleTimer.getInstance(getConfiguration()).schedule(new Runnable() {
@Override
public void run() {
store.ageOff();
}
}, 63000, 63000);
} catch (KeeperException | InterruptedException e) {
throw new IOException(e);
}
ZooKeeperInitialization.ensureZooKeeperInitialized(zReaderWriter, zroot);
// Make sure that we have a secret key (either a new one or an old one from ZK) before we start
// the master client service.
if (null != authenticationTokenKeyManager && null != keyDistributor) {
log.info("Starting delegation-token key manager");
keyDistributor.initialize();
authenticationTokenKeyManager.start();
boolean logged = false;
while (!authenticationTokenKeyManager.isInitialized()) {
// Print out a status message when we start waiting for the key manager to get initialized
if (!logged) {
log.info("Waiting for AuthenticationTokenKeyManager to be initialized");
logged = true;
}
sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
}
// And log when we are initialized
log.info("AuthenticationTokenSecretManager is initialized");
}
String address = sa.address.toString();
log.info("Setting master lock data to {}", address);
masterLock.replaceLockData(address.getBytes());
while (!clientService.isServing()) {
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
// Start the daemon to scan the replication table and make units of work
replicationWorkDriver = new ReplicationDriver(this);
replicationWorkDriver.start();
// Start the daemon to assign work to tservers to replicate to our peers
try {
replicationWorkAssigner = new WorkDriver(this);
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Caught exception trying to initialize replication WorkDriver", e);
throw new RuntimeException(e);
}
replicationWorkAssigner.start();
// Advertise that port we used so peers don't have to be told what it is
context.getZooReaderWriter().putPersistentData(
getZooKeeperRoot() + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR,
replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
// Register replication metrics
MasterMetricsFactory factory = new MasterMetricsFactory(getConfiguration(), this);
Metrics replicationMetrics = factory.createReplicationMetrics();
try {
replicationMetrics.register();
} catch (Exception e) {
log.error("Failed to register replication metrics", e);
}
// The master is fully initialized. Clients are allowed to connect now.
masterInitialized.set(true);
while (clientService.isServing()) {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
log.info("Shutting down fate.");
fate.shutdown();
log.info("Shutting down timekeeping.");
timeKeeper.shutdown();
final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
statusThread.join(remaining(deadline));
replicationWorkAssigner.join(remaining(deadline));
replicationWorkDriver.join(remaining(deadline));
replAddress.server.stop();
// Signal that we want it to stop, and wait for it to do so.
if (authenticationTokenKeyManager != null) {
authenticationTokenKeyManager.gracefulStop();
authenticationTokenKeyManager.join(remaining(deadline));
}
// quit, even if the tablet servers somehow jam up and the watchers
// don't stop
for (TabletGroupWatcher watcher : watchers) {
watcher.join(remaining(deadline));
}
log.info("exiting");
}
private long remaining(long deadline) {
return Math.max(1, deadline - System.currentTimeMillis());
}
public ZooLock getMasterLock() {
return masterLock;
}
private static class MasterLockWatcher implements ZooLock.AsyncLockWatcher {
boolean acquiredLock = false;
boolean failedToAcquireLock = false;
@Override
public void lostLock(LockLossReason reason) {
Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}
@Override
public void unableToMonitorLockNode(final Throwable e) {
// ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
Halt.halt(-1, new Runnable() {
@Override
public void run() {
log.error("FATAL: No longer able to monitor master lock node", e);
}
});
}
@Override
public synchronized void acquiredLock() {
log.debug("Acquired master lock");
if (acquiredLock || failedToAcquireLock) {
Halt.halt("Zoolock in unexpected state AL " + acquiredLock + " " + failedToAcquireLock, -1);
}
acquiredLock = true;
notifyAll();
}
@Override
public synchronized void failedToAcquireLock(Exception e) {
log.warn("Failed to get master lock", e);
if (e instanceof NoAuthException) {
String msg = "Failed to acquire master lock due to incorrect ZooKeeper authentication.";
log.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e);
Halt.halt(msg, -1);
}
if (acquiredLock) {
Halt.halt("Zoolock in unexpected state FAL " + acquiredLock + " " + failedToAcquireLock,
-1);
}
failedToAcquireLock = true;
notifyAll();
}
public synchronized void waitForChange() {
while (!acquiredLock && !failedToAcquireLock) {
try {
wait();
} catch (InterruptedException e) {}
}
}
}
private void getMasterLock(final String zMasterLoc) throws KeeperException, InterruptedException {
log.info("trying to get master lock");
final String masterClientAddress = hostname + ":"
+ getConfiguration().getPort(Property.MASTER_CLIENTPORT)[0];
while (true) {
MasterLockWatcher masterLockWatcher = new MasterLockWatcher();
masterLock = new ZooLock(getContext().getZooReaderWriter(), zMasterLoc);
masterLock.lockAsync(masterLockWatcher, masterClientAddress.getBytes());
masterLockWatcher.waitForChange();
if (masterLockWatcher.acquiredLock) {
break;
}
if (!masterLockWatcher.failedToAcquireLock) {
throw new IllegalStateException("master lock in unknown state");
}
masterLock.tryToCancelAsyncLockOrUnlock();
sleepUninterruptibly(TIME_TO_WAIT_BETWEEN_LOCK_CHECKS, TimeUnit.MILLISECONDS);
}
setMasterState(MasterState.HAVE_LOCK);
}
public static void main(String[] args) throws Exception {
final String app = "master";
ServerOpts opts = new ServerOpts();
opts.parseArgs(app, args);
ServerContext context = new ServerContext(opts.getSiteConfiguration());
context.setupServer(app, Master.class.getName(), opts.getAddress());
try {
Master master = new Master(context);
master.run();
} finally {
context.teardownServer();
}
}
@Override
public void update(LiveTServerSet current, Set<TServerInstance> deleted,
Set<TServerInstance> added) {
DeadServerList obit = new DeadServerList(context, getZooKeeperRoot() + Constants.ZDEADTSERVERS);
if (added.size() > 0) {
log.info("New servers: {}", added);
for (TServerInstance up : added)
obit.delete(up.hostPort());
}
for (TServerInstance dead : deleted) {
String cause = "unexpected failure";
if (serversToShutdown.contains(dead))
cause = "clean shutdown"; // maybe an incorrect assumption
if (!getMasterGoalState().equals(MasterGoalState.CLEAN_STOP))
obit.post(dead.hostPort(), cause);
}
Set<TServerInstance> unexpected = new HashSet<>(deleted);
unexpected.removeAll(this.serversToShutdown);
if (unexpected.size() > 0) {
if (stillMaster() && !getMasterGoalState().equals(MasterGoalState.CLEAN_STOP)) {
log.warn("Lost servers {}", unexpected);
}
}
serversToShutdown.removeAll(deleted);
badServers.keySet().removeAll(deleted);
// clear out any bad server with the same host/port as a new server
synchronized (badServers) {
cleanListByHostAndPort(badServers.keySet(), deleted, added);
}
synchronized (serversToShutdown) {
cleanListByHostAndPort(serversToShutdown, deleted, added);
}
synchronized (migrations) {
Iterator<Entry<KeyExtent,TServerInstance>> iter = migrations.entrySet().iterator();
while (iter.hasNext()) {
Entry<KeyExtent,TServerInstance> entry = iter.next();
if (deleted.contains(entry.getValue())) {
log.info("Canceling migration of {} to {}", entry.getKey(), entry.getValue());
iter.remove();
}
}
}
nextEvent.event("There are now %d tablet servers", current.size());
}
private static void cleanListByHostAndPort(Collection<TServerInstance> badServers,
Set<TServerInstance> deleted, Set<TServerInstance> added) {
Iterator<TServerInstance> badIter = badServers.iterator();
while (badIter.hasNext()) {
TServerInstance bad = badIter.next();
for (TServerInstance add : added) {
if (bad.hostPort().equals(add.hostPort())) {
badIter.remove();
break;
}
}
for (TServerInstance del : deleted) {
if (bad.hostPort().equals(del.hostPort())) {
badIter.remove();
break;
}
}
}
}
@Override
public void stateChanged(Table.ID tableId, TableState state) {
nextEvent.event("Table state in zookeeper changed for %s to %s", tableId, state);
if (TableState.OFFLINE == state) {
clearMigrations(tableId);
}
}
@Override
public void initialize(Map<Table.ID,TableState> tableIdToStateMap) {}
@Override
public void sessionExpired() {}
@Override
public Set<Table.ID> onlineTables() {
Set<Table.ID> result = new HashSet<>();
if (getMasterState() != MasterState.NORMAL) {
if (getMasterState() != MasterState.UNLOAD_METADATA_TABLETS)
result.add(MetadataTable.ID);
if (getMasterState() != MasterState.UNLOAD_ROOT_TABLET)
result.add(RootTable.ID);
return result;
}
TableManager manager = context.getTableManager();
for (Table.ID tableId : Tables.getIdToNameMap(context).keySet()) {
TableState state = manager.getTableState(tableId);
if (state != null) {
if (state == TableState.ONLINE)
result.add(tableId);
}
}
return result;
}
@Override
public Set<TServerInstance> onlineTabletServers() {
return tserverSet.getCurrentServers();
}
@Override
public Collection<MergeInfo> merges() {
List<MergeInfo> result = new ArrayList<>();
for (Table.ID tableId : Tables.getIdToNameMap(context).keySet()) {
result.add(getMergeInfo(tableId));
}
return result;
}
// recovers state from the persistent transaction to shutdown a server
public void shutdownTServer(TServerInstance server) {
nextEvent.event("Tablet Server shutdown requested for %s", server);
serversToShutdown.add(server);
}
public EventCoordinator getEventCoordinator() {
return nextEvent;
}
public ServerConfigurationFactory getConfigurationFactory() {
return serverConfig;
}
public VolumeManager getFileSystem() {
return this.fs;
}
public void assignedTablet(KeyExtent extent) {
if (extent.isMeta()) {
if (getMasterState().equals(MasterState.UNLOAD_ROOT_TABLET)) {
setMasterState(MasterState.UNLOAD_METADATA_TABLETS);
}
}
if (extent.isRootTablet()) {
// probably too late, but try anyhow
if (getMasterState().equals(MasterState.STOP)) {
setMasterState(MasterState.UNLOAD_ROOT_TABLET);
}
}
}
@SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs triage")
public void waitForBalance(TInfo tinfo) {
synchronized (balancedNotifier) {
long eventCounter;
do {
eventCounter = nextEvent.waitForEvents(0, 0);
try {
balancedNotifier.wait();
} catch (InterruptedException e) {
log.debug(e.toString(), e);
}
} while (displayUnassigned() > 0 || migrations.size() > 0
|| eventCounter != nextEvent.waitForEvents(0, 0));
}
}
public MasterMonitorInfo getMasterMonitorInfo() {
final MasterMonitorInfo result = new MasterMonitorInfo();
result.tServerInfo = new ArrayList<>();
result.tableMap = new DefaultMap<>(new TableInfo());
for (Entry<TServerInstance,TabletServerStatus> serverEntry : tserverStatus.entrySet()) {
final TabletServerStatus status = serverEntry.getValue();
result.tServerInfo.add(status);
for (Entry<String,TableInfo> entry : status.tableMap.entrySet()) {
TableInfoUtil.add(result.tableMap.get(entry.getKey()), entry.getValue());
}
}
result.badTServers = new HashMap<>();
synchronized (badServers) {
for (TServerInstance bad : badServers.keySet()) {
result.badTServers.put(bad.hostPort(), TabletServerState.UNRESPONSIVE.getId());
}
}
result.state = getMasterState();
result.goalState = getMasterGoalState();
result.unassignedTablets = displayUnassigned();
result.serversShuttingDown = new HashSet<>();
synchronized (serversToShutdown) {
for (TServerInstance server : serversToShutdown)
result.serversShuttingDown.add(server.hostPort());
}
DeadServerList obit = new DeadServerList(context, getZooKeeperRoot() + Constants.ZDEADTSERVERS);
result.deadTabletServers = obit.getList();
result.bulkImports = bulkImportStatus.getBulkLoadStatus();
return result;
}
/**
* Can delegation tokens be generated for users
*/
public boolean delegationTokensAvailable() {
return delegationTokensAvailable;
}
@Override
public Set<KeyExtent> migrationsSnapshot() {
Set<KeyExtent> migrationKeys = new HashSet<>();
synchronized (migrations) {
migrationKeys.addAll(migrations.keySet());
}
return Collections.unmodifiableSet(migrationKeys);
}
@Override
public Set<TServerInstance> shutdownServers() {
synchronized (serversToShutdown) {
return new HashSet<>(serversToShutdown);
}
}
public void markDeadServerLogsAsClosed(Map<TServerInstance,List<Path>> logsForDeadServers)
throws WalMarkerException {
WalStateManager mgr = new WalStateManager(context);
for (Entry<TServerInstance,List<Path>> server : logsForDeadServers.entrySet()) {
for (Path path : server.getValue()) {
mgr.closeWal(server.getKey(), path);
}
}
}
public void updateBulkImportStatus(String directory, BulkImportState state) {
bulkImportStatus.updateBulkImportStatus(Collections.singletonList(directory), state);
}
public void removeBulkImportStatus(String directory) {
bulkImportStatus.removeBulkImportStatus(Collections.singletonList(directory));
}
/**
* Return how long (in milliseconds) there has been a master overseeing this cluster. This is an
* approximately monotonic clock, which will be approximately consistent between different masters
* or different runs of the same master.
*/
public Long getSteadyTime() {
return timeKeeper.getTime();
}
@Override
public boolean isActiveService() {
return masterInitialized.get();
}
public FSDataOutputStream getOutputStream(final String path) throws IOException {
FileSystem fileSystem = fs.getDefaultVolume().getFileSystem();
return fileSystem.create(new Path(path));
}
public FSDataInputStream getInputStream(final String path) throws IOException {
FileSystem fileSystem = fs.getDefaultVolume().getFileSystem();
return fileSystem.open(new Path(path));
}
}