blob: e141de9608e56b4361c2ced60021159f57fecf63 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.tserver;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Durability;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.CompressedIterators;
import org.apache.accumulo.core.client.impl.DurabilityImpl;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.impl.TabletType;
import org.apache.accumulo.core.client.impl.Translator;
import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
import org.apache.accumulo.core.client.impl.Translators;
import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.ConstraintViolationSummary;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.data.thrift.InitialMultiScan;
import org.apache.accumulo.core.data.thrift.InitialScan;
import org.apache.accumulo.core.data.thrift.IterInfo;
import org.apache.accumulo.core.data.thrift.MapFileInfo;
import org.apache.accumulo.core.data.thrift.MultiScanResult;
import org.apache.accumulo.core.data.thrift.ScanResult;
import org.apache.accumulo.core.data.thrift.TCMResult;
import org.apache.accumulo.core.data.thrift.TCMStatus;
import org.apache.accumulo.core.data.thrift.TColumn;
import org.apache.accumulo.core.data.thrift.TConditionalMutation;
import org.apache.accumulo.core.data.thrift.TConditionalSession;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
import org.apache.accumulo.core.data.thrift.TKeyValue;
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.data.thrift.TRange;
import org.apache.accumulo.core.data.thrift.UpdateErrors;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletLoadState;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.tabletserver.thrift.TDurability;
import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.DistributedTrace;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.ByteBufferUtil;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.ComparablePair;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory.RateProvider;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.util.LoggingRunnable;
import org.apache.accumulo.fate.util.Retry;
import org.apache.accumulo.fate.util.Retry.RetryFactory;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.GarbageCollectionLogger;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.TabletLevel;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.master.recovery.RecoveryPath;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletLocationState;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
import org.apache.accumulo.server.master.state.TabletStateStore;
import org.apache.accumulo.server.master.state.ZooTabletStateStore;
import org.apache.accumulo.server.master.tableOps.UserCompactionConfig;
import org.apache.accumulo.server.metrics.Metrics;
import org.apache.accumulo.server.metrics.MetricsSystemHelper;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.replication.ZooKeeperInitialization;
import org.apache.accumulo.server.rpc.RpcWrapper;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager;
import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher;
import org.apache.accumulo.server.util.FileSystemMonitor;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MasterMetadataUtil;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
import org.apache.accumulo.server.util.time.RelativeTime;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker;
import org.apache.accumulo.tserver.RowLocks.RowLock;
import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.TabletStatsKeeper.Operation;
import org.apache.accumulo.tserver.compaction.MajorCompactionReason;
import org.apache.accumulo.tserver.data.ServerConditionalMutation;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.accumulo.tserver.log.MutationReceiver;
import org.apache.accumulo.tserver.log.TabletServerLogger;
import org.apache.accumulo.tserver.mastermessage.MasterMessage;
import org.apache.accumulo.tserver.mastermessage.SplitReportMessage;
import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage;
import org.apache.accumulo.tserver.metrics.TabletServerMetricsFactory;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics;
import org.apache.accumulo.tserver.replication.ReplicationServicerHandler;
import org.apache.accumulo.tserver.replication.ReplicationWorker;
import org.apache.accumulo.tserver.scan.LookupTask;
import org.apache.accumulo.tserver.scan.NextBatchTask;
import org.apache.accumulo.tserver.scan.ScanRunState;
import org.apache.accumulo.tserver.session.ConditionalSession;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.ScanSession;
import org.apache.accumulo.tserver.session.Session;
import org.apache.accumulo.tserver.session.SessionManager;
import org.apache.accumulo.tserver.session.UpdateSession;
import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
import org.apache.accumulo.tserver.tablet.CommitSession;
import org.apache.accumulo.tserver.tablet.CompactionInfo;
import org.apache.accumulo.tserver.tablet.CompactionWatcher;
import org.apache.accumulo.tserver.tablet.Compactor;
import org.apache.accumulo.tserver.tablet.KVEntry;
import org.apache.accumulo.tserver.tablet.ScanBatch;
import org.apache.accumulo.tserver.tablet.Tablet;
import org.apache.accumulo.tserver.tablet.TabletClosedException;
import org.apache.accumulo.tserver.tablet.TabletData;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.server.TServer;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
public class TabletServer extends AccumuloServerContext implements Runnable {
private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
private static final long TIME_BETWEEN_GC_CHECKS = 5000;
private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
private final TransactionWatcher watcher = new TransactionWatcher();
private final ZooCache masterLockCache = new ZooCache();
private final TabletServerLogger logger;
private final TabletServerMetricsFactory metricsFactory;
private final Metrics updateMetrics;
private final Metrics scanMetrics;
private final Metrics mincMetrics;
public Metrics getScanMetrics() {
return scanMetrics;
}
public Metrics getMinCMetrics() {
return mincMetrics;
}
private final LogSorter logSorter;
private ReplicationWorker replWorker = null;
private final TabletStatsKeeper statsKeeper;
private final AtomicInteger logIdGenerator = new AtomicInteger();
private final AtomicLong flushCounter = new AtomicLong(0);
private final AtomicLong syncCounter = new AtomicLong(0);
private final VolumeManager fs;
private final SortedMap<KeyExtent,Tablet> onlineTablets =
Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>());
private final SortedSet<KeyExtent> unopenedTablets =
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
private final SortedSet<KeyExtent> openingTablets =
Collections.synchronizedSortedSet(new TreeSet<KeyExtent>());
@SuppressWarnings("unchecked")
private final Map<KeyExtent,Long> recentlyUnloadedCache =
Collections.synchronizedMap(new LRUMap(1000));
private final TabletServerResourceManager resourceManager;
private final SecurityOperation security;
private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<>();
private Thread majorCompactorThread;
private HostAndPort replicationAddress;
private HostAndPort clientAddress;
private volatile boolean serverStopRequested = false;
private volatile boolean shutdownComplete = false;
private ZooLock tabletServerLock;
private TServer server;
private TServer replServer;
private DistributedWorkQueue bulkFailedCopyQ;
private String lockID;
public static final AtomicLong seekCount = new AtomicLong(0);
private final AtomicLong totalMinorCompactions = new AtomicLong(0);
private final ServerConfigurationFactory confFactory;
private final ZooAuthenticationKeyWatcher authKeyWatcher;
private final WalStateManager walMarker;
public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) throws IOException {
super(confFactory);
this.confFactory = confFactory;
this.fs = fs;
final AccumuloConfiguration aconf = getConfiguration();
Instance instance = getInstance();
log.info("Version " + Constants.VERSION);
log.info("Instance " + instance.getInstanceID());
this.sessionManager = new SessionManager(aconf);
this.logSorter = new LogSorter(instance, fs, aconf);
this.replWorker = new ReplicationWorker(this, fs);
this.statsKeeper = new TabletStatsKeeper();
final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT);
final long logBusyTabletsDelay =
aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL);
// This thread will calculate and log out the busiest tablets based on ingest count and
// query count every #{logBusiestTabletsDelay}
if (numBusyTabletsToLog > 0) {
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
private BusiestTracker ingestTracker =
BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog);
private BusiestTracker queryTracker =
BusiestTracker.newBusiestQueryTracker(numBusyTabletsToLog);
@Override
public void run() {
List<Tablet> tablets;
synchronized (onlineTablets) {
tablets = new ArrayList<>(onlineTablets.values());
}
logBusyTablets(ingestTracker.computeBusiest(tablets), "ingest count");
logBusyTablets(queryTracker.computeBusiest(tablets), "query count");
}
private void logBusyTablets(List<ComparablePair<Long,KeyExtent>> busyTablets,
String label) {
int i = 1;
for (Pair<Long,KeyExtent> pair : busyTablets) {
log.debug("{} busiest tablet by {}: {} -- extent: {} ", i, label.toLowerCase(),
pair.getFirst(), pair.getSecond());
i++;
}
}
}, logBusyTabletsDelay, logBusyTabletsDelay);
}
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
public void run() {
synchronized (onlineTablets) {
long now = System.currentTimeMillis();
for (Tablet tablet : onlineTablets.values())
try {
tablet.updateRates(now);
} catch (Exception ex) {
log.error("Error updating rates for {}", tablet.getExtent(), ex);
}
}
}
}, 5000, 5000);
final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE);
final long walogMaxAge = aconf.getTimeInMillis(Property.TSERV_WALOG_MAX_AGE);
final long minBlockSize =
CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0);
if (minBlockSize != 0 && minBlockSize > walogMaxSize)
throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize "
+ walogMaxSize + " but hdfs minimum block size is " + minBlockSize
+ ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE
+ " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml.");
final long toleratedWalCreationFailures =
aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES);
final long walFailureRetryIncrement =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT);
final long walFailureRetryMax =
aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION);
final RetryFactory walCreationRetryFactory =
Retry.builder().maxRetries(toleratedWalCreationFailures)
.retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS)
.incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS)
.maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).logInterval(3, TimeUnit.MINUTES)
.createFactory();
// Tolerate infinite failures for the write, however backing off the same as for creation
// failures.
final RetryFactory walWritingRetryFactory = Retry.builder().infiniteRetries()
.retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS)
.incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS)
.maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).logInterval(3, TimeUnit.MINUTES)
.createFactory();
logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter,
walCreationRetryFactory, walWritingRetryFactory, walogMaxAge);
this.resourceManager = new TabletServerResourceManager(this, fs);
this.security = AuditedSecurityOperation.getInstance(this);
metricsFactory = new TabletServerMetricsFactory(aconf);
updateMetrics = metricsFactory.createUpdateMetrics();
scanMetrics = metricsFactory.createScanMetrics();
mincMetrics = metricsFactory.createMincMetrics();
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
public void run() {
TabletLocator.clearLocators();
}
}, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
walMarker = new WalStateManager(instance, ZooReaderWriter.getInstance());
// Create the secret manager
setSecretManager(new AuthenticationTokenSecretManager(instance,
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys");
// Watcher to notice new AuthenticationKeys which enable delegation tokens
authKeyWatcher =
new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(),
ZooUtil.getRoot(instance) + Constants.ZDELEGATION_TOKEN_KEYS);
} else {
authKeyWatcher = null;
}
}
private static long jitter(long ms) {
Random r = new Random();
// add a random 10% wait
return (long) ((1. + (r.nextDouble() / 10)) * ms);
}
private final SessionManager sessionManager;
private final WriteTracker writeTracker = new WriteTracker();
private final RowLocks rowLocks = new RowLocks();
private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
private final ReentrantLock recoveryLock = new ReentrantLock(true);
private ThriftClientHandler clientHandler;
private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
private class ThriftClientHandler extends ClientServiceHandler
implements TabletClientService.Iface {
ThriftClientHandler() {
super(TabletServer.this, watcher, fs);
log.debug(ThriftClientHandler.class.getName() + " created");
}
@Override
public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials, final long tid,
final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean setTime)
throws ThriftSecurityException {
if (!security.canPerformSystemActions(credentials))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
try {
return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, new Callable<List<TKeyExtent>>() {
@Override
public List<TKeyExtent> call() throws Exception {
List<TKeyExtent> failures = new ArrayList<>();
for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry : files.entrySet()) {
TKeyExtent tke = entry.getKey();
Map<String,MapFileInfo> fileMap = entry.getValue();
Map<FileRef,MapFileInfo> fileRefMap = new HashMap<>();
for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
Path path = new Path(mapping.getKey());
FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
path = ns.makeQualified(path);
fileRefMap.put(new FileRef(path.toString(), path), mapping.getValue());
}
Tablet importTablet = onlineTablets.get(new KeyExtent(tke));
if (importTablet == null) {
failures.add(tke);
} else {
try {
importTablet.importMapFiles(tid, fileRefMap, setTime);
} catch (IOException ioe) {
log.info("files {} not imported to {}: {}", fileMap.keySet(), new KeyExtent(tke),
ioe.getMessage());
failures.add(tke);
}
}
}
return failures;
}
});
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
boolean isolated, long readaheadThreshold, TSamplerConfiguration tSamplerConfig,
long batchTimeOut, String context) throws NotServingTabletException,
ThriftSecurityException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
String tableId = new String(textent.getTable(), UTF_8);
String namespaceId;
try {
namespaceId = Tables.getNamespaceId(getInstance(), tableId);
} catch (TableNotFoundException e1) {
throw new NotServingTabletException(textent);
}
if (!security.canScan(credentials, tableId, namespaceId, range, columns, ssiList, ssio,
authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
if (!security.authenticatedUserHasAuthorizations(credentials, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
final KeyExtent extent = new KeyExtent(textent);
// wait for any writes that are in flight.. this done to ensure
// consistency across client restarts... assume a client writes
// to accumulo and dies while waiting for a confirmation from
// accumulo... the client process restarts and tries to read
// data from accumulo making the assumption that it will get
// any writes previously made... however if the server side thread
// processing the write from the dead client is still in progress,
// the restarted client may not see the write unless we wait here.
// this behavior is very important when the client is reading the
// metadata
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(extent));
Tablet tablet = onlineTablets.get(extent);
if (tablet == null)
throw new NotServingTabletException(textent);
Set<Column> columnSet = new HashSet<>();
for (TColumn tcolumn : columns) {
columnSet.add(new Column(tcolumn));
}
final ScanSession scanSession = new ScanSession(credentials, extent, columnSet, ssiList, ssio,
new Authorizations(authorizations), readaheadThreshold, batchTimeOut, context);
scanSession.scanner = tablet.createScanner(new Range(range), batchSize, scanSession.columnSet,
scanSession.auths, ssiList, ssio, isolated, scanSession.interruptFlag,
SamplerConfigurationImpl.fromThrift(tSamplerConfig), scanSession.batchTimeOut,
scanSession.context);
long sid = sessionManager.createSession(scanSession, true);
ScanResult scanResult;
try {
scanResult = continueScan(tinfo, sid, scanSession);
} catch (NoSuchScanIDException e) {
log.error("The impossible happened", e);
throw new RuntimeException();
} finally {
sessionManager.unreserveSession(sid);
}
return new InitialScan(sid, scanResult);
}
@Override
public ScanResult continueScan(TInfo tinfo, long scanID)
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
ScanSession scanSession = (ScanSession) sessionManager.reserveSession(scanID);
if (scanSession == null) {
throw new NoSuchScanIDException();
}
try {
return continueScan(tinfo, scanID, scanSession);
} finally {
sessionManager.unreserveSession(scanSession);
}
}
private ScanResult continueScan(TInfo tinfo, long scanID, ScanSession scanSession)
throws NoSuchScanIDException, NotServingTabletException,
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
TSampleNotPresentException {
if (scanSession.nextBatchTask == null) {
scanSession.nextBatchTask =
new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
ScanBatch bresult;
try {
bresult = scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
TimeUnit.MILLISECONDS);
scanSession.nextBatchTask = null;
} catch (ExecutionException e) {
sessionManager.removeSession(scanID);
if (e.getCause() instanceof NotServingTabletException)
throw (NotServingTabletException) e.getCause();
else if (e.getCause() instanceof TooManyFilesException)
throw new org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
scanSession.extent.toThrift());
else if (e.getCause() instanceof SampleNotPresentException)
throw new TSampleNotPresentException(scanSession.extent.toThrift());
else if (e.getCause() instanceof IOException) {
sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
List<KVEntry> empty = Collections.emptyList();
bresult = new ScanBatch(empty, true);
scanSession.nextBatchTask = null;
} else {
throw new RuntimeException(e);
}
} catch (CancellationException ce) {
sessionManager.removeSession(scanID);
Tablet tablet = onlineTablets.get(scanSession.extent);
if (tablet == null || tablet.isClosed())
throw new NotServingTabletException(scanSession.extent.toThrift());
else
throw new NoSuchScanIDException();
} catch (TimeoutException e) {
List<TKeyValue> param = Collections.emptyList();
long timeout =
TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
sessionManager.removeIfNotAccessed(scanID, timeout);
return new ScanResult(param, true);
} catch (Throwable t) {
sessionManager.removeSession(scanID);
log.warn("Failed to get next batch", t);
throw new RuntimeException(t);
}
ScanResult scanResult = new ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
scanSession.entriesReturned += scanResult.results.size();
scanSession.batchCount++;
if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) {
// start reading next batch while current batch is transmitted
// to client
scanSession.nextBatchTask =
new NextBatchTask(TabletServer.this, scanID, scanSession.interruptFlag);
resourceManager.executeReadAhead(scanSession.extent, scanSession.nextBatchTask);
}
if (!scanResult.more)
closeScan(tinfo, scanID);
return scanResult;
}
@Override
public void closeScan(TInfo tinfo, long scanID) {
final ScanSession ss = (ScanSession) sessionManager.removeSession(scanID);
if (ss != null) {
long t2 = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug(String.format("ScanSess tid %s %s %,d entries in %.2f secs, nbTimes = [%s] ",
TServerUtils.clientAddress.get(), ss.extent.getTableId(), ss.entriesReturned,
(t2 - ss.startTime) / 1000.0, ss.nbTimes.toString()));
}
if (scanMetrics.isEnabled()) {
scanMetrics.add(TabletServerScanMetrics.SCAN, t2 - ss.startTime);
scanMetrics.add(TabletServerScanMetrics.RESULT_SIZE, ss.entriesReturned);
}
}
}
@Override
public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
TSamplerConfiguration tSamplerConfig, long batchTimeOut, String context)
throws ThriftSecurityException, TSampleNotPresentException {
// find all of the tables that need to be scanned
final HashSet<String> tables = new HashSet<>();
for (TKeyExtent keyExtent : tbatch.keySet()) {
tables.add(new String(keyExtent.getTable(), UTF_8));
}
if (tables.size() != 1)
throw new IllegalArgumentException("Cannot batch scan over multiple tables");
// check if user has permission to the tables
for (String tableId : tables) {
String namespaceId;
try {
namespaceId = Tables.getNamespaceId(getInstance(), tableId);
} catch (TableNotFoundException e1) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.TABLE_DOESNT_EXIST);
}
if (!security.canScan(credentials, tableId, namespaceId, tbatch, tcolumns, ssiList, ssio,
authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
}
try {
if (!security.authenticatedUserHasAuthorizations(credentials, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
} catch (ThriftSecurityException tse) {
log.error("{} is not authorized", credentials.getPrincipal(), tse);
throw tse;
}
Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new TKeyExtentTranslator(),
new Translator.ListTranslator<>(new TRangeTranslator()));
// This is used to determine which thread pool to use
KeyExtent threadPoolExtent = batch.keySet().iterator().next();
if (waitForWrites)
writeTracker.waitForWrites(TabletType.type(batch.keySet()));
final MultiScanSession mss = new MultiScanSession(credentials, threadPoolExtent, batch,
ssiList, ssio, new Authorizations(authorizations),
SamplerConfigurationImpl.fromThrift(tSamplerConfig), batchTimeOut, context);
mss.numTablets = batch.size();
for (List<Range> ranges : batch.values()) {
mss.numRanges += ranges.size();
}
for (TColumn tcolumn : tcolumns)
mss.columnSet.add(new Column(tcolumn));
long sid = sessionManager.createSession(mss, true);
MultiScanResult result;
try {
result = continueMultiScan(tinfo, sid, mss);
} catch (NoSuchScanIDException e) {
log.error("the impossible happened", e);
throw new RuntimeException("the impossible happened", e);
} finally {
sessionManager.unreserveSession(sid);
}
return new InitialMultiScan(sid, result);
}
@Override
public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
throws NoSuchScanIDException, TSampleNotPresentException {
MultiScanSession session = (MultiScanSession) sessionManager.reserveSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
try {
return continueMultiScan(tinfo, scanID, session);
} finally {
sessionManager.unreserveSession(session);
}
}
private MultiScanResult continueMultiScan(TInfo tinfo, long scanID, MultiScanSession session)
throws NoSuchScanIDException, TSampleNotPresentException {
if (session.lookupTask == null) {
session.lookupTask = new LookupTask(TabletServer.this, scanID);
resourceManager.executeReadAhead(session.threadPoolExtent, session.lookupTask);
}
try {
MultiScanResult scanResult =
session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
session.lookupTask = null;
return scanResult;
} catch (ExecutionException e) {
sessionManager.removeSession(scanID);
if (e.getCause() instanceof SampleNotPresentException) {
throw new TSampleNotPresentException();
} else {
log.warn("Failed to get multiscan result", e);
throw new RuntimeException(e);
}
} catch (TimeoutException e1) {
long timeout =
TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
sessionManager.removeIfNotAccessed(scanID, timeout);
List<TKeyValue> results = Collections.emptyList();
Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
List<TKeyExtent> fullScans = Collections.emptyList();
return new MultiScanResult(results, failures, fullScans, null, null, false, true);
} catch (Throwable t) {
sessionManager.removeSession(scanID);
log.warn("Failed to get multiscan result", t);
throw new RuntimeException(t);
}
}
@Override
public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException {
MultiScanSession session = (MultiScanSession) sessionManager.removeSession(scanID);
if (session == null) {
throw new NoSuchScanIDException();
}
long t2 = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug(String.format(
"MultiScanSess %s %,d entries in %.2f secs"
+ " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
TServerUtils.clientAddress.get(), session.numEntries, (t2 - session.startTime) / 1000.0,
session.totalLookupTime / 1000.0, session.numTablets, session.numRanges));
}
}
@Override
public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability tdurabilty)
throws ThriftSecurityException {
// Make sure user is real
Durability durability = DurabilityImpl.fromThrift(tdurabilty);
security.authenticateUser(credentials, credentials);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
UpdateSession us =
new UpdateSession(new TservConstraintEnv(security, credentials), credentials, durability);
long sid = sessionManager.createSession(us, false);
return sid;
}
private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
long t1 = System.currentTimeMillis();
if (us.currentTablet != null && us.currentTablet.getExtent().equals(keyExtent))
return;
if (us.currentTablet == null
&& (us.failures.containsKey(keyExtent) || us.authFailures.containsKey(keyExtent))) {
// if there were previous failures, then do not accept additional writes
return;
}
String tableId = "";
try {
// if user has no permission to write to this table, add it to
// the failures list
boolean sameTable = us.currentTablet != null
&& (us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
tableId = keyExtent.getTableId().toString();
if (sameTable || security.canWrite(us.getCredentials(), tableId,
Tables.getNamespaceId(getInstance(), tableId))) {
long t2 = System.currentTimeMillis();
us.authTimes.addStat(t2 - t1);
us.currentTablet = onlineTablets.get(keyExtent);
if (us.currentTablet != null) {
us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
} else {
// not serving tablet, so report all mutations as
// failures
us.failures.put(keyExtent, 0l);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.UNKNOWN_TABLET_ERRORS, 0);
}
} else {
log.warn(
"Denying access to table " + keyExtent.getTableId() + " for user " + us.getUser());
long t2 = System.currentTimeMillis();
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
return;
}
} catch (TableNotFoundException tnfe) {
log.error("Table " + tableId + " not found ", tnfe);
long t2 = System.currentTimeMillis();
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.UNKNOWN_TABLET_ERRORS, 0);
return;
} catch (ThriftSecurityException e) {
log.error("Denying permission to check user " + us.getUser() + " with user " + e.getUser(),
e);
long t2 = System.currentTimeMillis();
us.authTimes.addStat(t2 - t1);
us.currentTablet = null;
us.authFailures.put(keyExtent, e.getCode());
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.PERMISSION_ERRORS, 0);
return;
}
}
@Override
public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
List<TMutation> tmutations) {
UpdateSession us = (UpdateSession) sessionManager.reserveSession(updateID);
if (us == null) {
return;
}
boolean reserved = true;
try {
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
setUpdateTablet(us, keyExtent);
if (us.currentTablet != null) {
long additionalMutationSize = 0;
List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
for (TMutation tmutation : tmutations) {
Mutation mutation = new ServerMutation(tmutation);
mutations.add(mutation);
additionalMutationSize += mutation.numBytes();
}
us.queuedMutationSize += additionalMutationSize;
long totalQueued = updateTotalQueuedMutationSize(additionalMutationSize);
long total = TabletServer.this.getConfiguration()
.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
if (totalQueued > total) {
try {
flush(us);
} catch (HoldTimeoutException hte) {
// Assumption is that the client has timed out and is gone. If thats not the case,
// then removing the session should cause the client to fail
// in such a way that it retries.
log.debug("HoldTimeoutException during applyUpdates, removing session");
sessionManager.removeSession(updateID, true);
reserved = false;
}
}
}
} finally {
if (reserved) {
sessionManager.unreserveSession(us);
}
}
}
private void flush(UpdateSession us) {
int mutationCount = 0;
Map<CommitSession,Mutations> sendables = new HashMap<>();
Throwable error = null;
long pt1 = System.currentTimeMillis();
boolean containsMetadataTablet = false;
for (Tablet tablet : us.queuedMutations.keySet())
if (tablet.getExtent().isMeta())
containsMetadataTablet = true;
if (!containsMetadataTablet && us.queuedMutations.size() > 0)
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
Span prep = Trace.start("prep");
try {
for (Entry<Tablet,? extends List<Mutation>> entry : us.queuedMutations.entrySet()) {
Tablet tablet = entry.getKey();
Durability tabletDurability = tablet.getDurability();
List<Mutation> mutations = entry.getValue();
if (mutations.size() > 0) {
try {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.MUTATION_ARRAY_SIZE, mutations.size());
CommitSession commitSession = tablet.prepareMutationsForCommit(us.cenv, mutations);
if (commitSession == null) {
if (us.currentTablet == tablet) {
us.currentTablet = null;
}
us.failures.put(tablet.getExtent(), us.successfulCommits.get(tablet));
} else {
sendables.put(commitSession, new Mutations(
DurabilityImpl.resolveDurabilty(us.durability, tabletDurability), mutations));
mutationCount += mutations.size();
}
} catch (TConstraintViolationException e) {
us.violations.add(e.getViolations());
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.CONSTRAINT_VIOLATIONS, 0);
if (e.getNonViolators().size() > 0) {
// only log and commit mutations if there were some
// that did not violate constraints... this is what
// prepareMutationsForCommit() expects
sendables.put(e.getCommitSession(),
new Mutations(DurabilityImpl.resolveDurabilty(us.durability, tabletDurability),
e.getNonViolators()));
}
mutationCount += mutations.size();
} catch (Throwable t) {
error = t;
log.error("Unexpected error preparing for commit", error);
break;
}
}
}
} finally {
prep.stop();
}
long pt2 = System.currentTimeMillis();
us.prepareTimes.addStat(pt2 - pt1);
updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
if (error != null) {
for (Entry<CommitSession,Mutations> e : sendables.entrySet()) {
e.getKey().abortCommit(e.getValue().getMutations());
}
throw new RuntimeException(error);
}
try {
Span wal = Trace.start("wal");
try {
while (true) {
try {
long t1 = System.currentTimeMillis();
logger.logManyTablets(sendables);
long t2 = System.currentTimeMillis();
us.walogTimes.addStat(t2 - t1);
updateWalogWriteTime((t2 - t1));
break;
} catch (IOException ex) {
log.warn("logging mutations failed, retrying");
} catch (FSError ex) { // happens when DFS is localFS
log.warn("logging mutations failed, retrying");
} catch (Throwable t) {
log.error("Unknown exception logging mutations, counts"
+ " for mutations in flight not decremented!", t);
throw new RuntimeException(t);
}
}
} finally {
wal.stop();
}
Span commit = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue().getMutations();
commitSession.commit(mutations);
KeyExtent extent = commitSession.getExtent();
if (us.currentTablet != null && extent == us.currentTablet.getExtent()) {
// because constraint violations may filter out some
// mutations, for proper accounting with the client code,
// need to increment the count based on the original
// number of mutations from the client NOT the filtered number
us.successfulCommits.increment(us.currentTablet,
us.queuedMutations.get(us.currentTablet).size());
}
}
long t2 = System.currentTimeMillis();
us.flushTime += (t2 - pt1);
us.commitTimes.addStat(t2 - t1);
updateAvgCommitTime(t2 - t1, sendables.size());
} finally {
commit.stop();
}
} finally {
us.queuedMutations.clear();
if (us.currentTablet != null) {
us.queuedMutations.put(us.currentTablet, new ArrayList<Mutation>());
}
updateTotalQueuedMutationSize(-us.queuedMutationSize);
us.queuedMutationSize = 0;
}
us.totalUpdates += mutationCount;
}
private void updateWalogWriteTime(long time) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.WALOG_WRITE_TIME, time);
}
private void updateAvgCommitTime(long time, int size) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.COMMIT_TIME, (long) ((time) / (double) size));
}
private void updateAvgPrepTime(long time, int size) {
if (updateMetrics.isEnabled())
updateMetrics.add(TabletServerUpdateMetrics.COMMIT_PREP, (long) ((time) / (double) size));
}
@Override
public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws NoSuchScanIDException {
final UpdateSession us = (UpdateSession) sessionManager.removeSession(updateID);
if (us == null) {
throw new NoSuchScanIDException();
}
// clients may or may not see data from an update session while
// it is in progress, however when the update session is closed
// want to ensure that reads wait for the write to finish
long opid = writeTracker.startWrite(us.queuedMutations.keySet());
try {
flush(us);
} catch (HoldTimeoutException e) {
// Assumption is that the client has timed out and is gone. If thats not the case throw an
// exception that will cause it to retry.
log.debug("HoldTimeoutException during closeUpdate, reporting no such session");
throw new NoSuchScanIDException();
} finally {
writeTracker.finishWrite(opid);
}
if (log.isDebugEnabled()) {
log.debug(
String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs lt=%.3fs ct=%.3fs)",
TServerUtils.clientAddress.get(), us.totalUpdates,
(System.currentTimeMillis() - us.startTime) / 1000.0, us.authTimes.toString(),
us.flushTime / 1000.0, us.prepareTimes.getSum() / 1000.0,
us.walogTimes.getSum() / 1000.0, us.commitTimes.getSum() / 1000.0));
}
if (us.failures.size() > 0) {
Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
log.debug(String.format("Failures: %d, first extent %s successful commits: %d",
us.failures.size(), first.getKey().toString(), first.getValue()));
}
List<ConstraintViolationSummary> violations = us.violations.asList();
if (violations.size() > 0) {
ConstraintViolationSummary first = us.violations.asList().iterator().next();
log.debug(String.format("Violations: %d, first %s occurs %d", violations.size(),
first.violationDescription, first.numberOfViolatingMutations));
}
if (us.authFailures.size() > 0) {
KeyExtent first = us.authFailures.keySet().iterator().next();
log.debug(String.format("Authentication Failures: %d, first %s", us.authFailures.size(),
first.toString()));
}
return new UpdateErrors(Translator.translate(us.failures, Translators.KET),
Translator.translate(violations, Translators.CVST),
Translator.translate(us.authFailures, Translators.KET));
}
@Override
public void update(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
TMutation tmutation, TDurability tdurability)
throws NotServingTabletException, ConstraintViolationException, ThriftSecurityException {
final String tableId = new String(tkeyExtent.getTable(), UTF_8);
String namespaceId;
try {
namespaceId = Tables.getNamespaceId(getInstance(), tableId);
} catch (TableNotFoundException e1) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.TABLE_DOESNT_EXIST);
}
if (!security.canWrite(credentials, tableId, namespaceId))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
final Tablet tablet = onlineTablets.get(new KeyExtent(keyExtent));
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
Durability tabletDurability = tablet.getDurability();
if (!keyExtent.isMeta()) {
try {
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
} catch (HoldTimeoutException hte) {
// Major hack. Assumption is that the client has timed out and is gone. If thats not the
// case, then throwing the following will let client know there
// was a failure and it should retry.
throw new NotServingTabletException(tkeyExtent);
}
}
final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
try {
final Mutation mutation = new ServerMutation(tmutation);
final List<Mutation> mutations = Collections.singletonList(mutation);
final Span prep = Trace.start("prep");
CommitSession cs;
try {
cs = tablet.prepareMutationsForCommit(new TservConstraintEnv(security, credentials),
mutations);
} finally {
prep.stop();
}
if (cs == null) {
throw new NotServingTabletException(tkeyExtent);
}
while (true) {
try {
final Span wal = Trace.start("wal");
try {
logger.log(cs, cs.getWALogSeq(), mutation, DurabilityImpl
.resolveDurabilty(DurabilityImpl.fromThrift(tdurability), tabletDurability));
} finally {
wal.stop();
}
break;
} catch (IOException ex) {
log.warn("Error writing mutations to log", ex);
}
}
final Span commit = Trace.start("commit");
try {
cs.commit(mutations);
} finally {
commit.stop();
}
} catch (TConstraintViolationException e) {
throw new ConstraintViolationException(
Translator.translate(e.getViolations().asList(), Translators.CVST));
} finally {
writeTracker.finishWrite(opid);
}
}
private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results, ConditionalSession cs, List<String> symbols)
throws IOException {
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter =
updates.entrySet().iterator();
final CompressedIterators compressedIters = new CompressedIterators(symbols);
ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters,
confFactory.getTableConfiguration(cs.tableId));
while (iter.hasNext()) {
final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
final Tablet tablet = onlineTablets.get(entry.getKey());
if (tablet == null || tablet.isClosed()) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
iter.remove();
} else {
final List<ServerConditionalMutation> okMutations =
new ArrayList<>(entry.getValue().size());
final List<TCMResult> resultsSubList = results.subList(results.size(), results.size());
ConditionChecker checker =
checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList);
try {
tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
if (okMutations.size() > 0) {
entry.setValue(okMutations);
} else {
iter.remove();
}
} catch (TabletClosedException | IterationInterruptedException
| TooManyFilesException e) {
// clear anything added while checking conditions.
resultsSubList.clear();
for (ServerConditionalMutation scm : entry.getValue()) {
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
}
iter.remove();
}
}
}
}
private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results, ConditionalSession sess) {
Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es = updates.entrySet();
Map<CommitSession,Mutations> sendables = new HashMap<>();
boolean sessionCanceled = sess.interruptFlag.get();
Span prepSpan = Trace.start("prep");
try {
long t1 = System.currentTimeMillis();
for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
final Tablet tablet = onlineTablets.get(entry.getKey());
if (tablet == null || tablet.isClosed() || sessionCanceled) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
final Durability tabletDurability = tablet.getDurability();
try {
@SuppressWarnings("unchecked")
List<Mutation> mutations =
(List<Mutation>) (List<? extends Mutation>) entry.getValue();
if (mutations.size() > 0) {
CommitSession cs = tablet.prepareMutationsForCommit(
new TservConstraintEnv(security, sess.credentials), mutations);
if (cs == null) {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
} else {
for (ServerConditionalMutation scm : entry.getValue())
results.add(new TCMResult(scm.getID(), TCMStatus.ACCEPTED));
sendables.put(cs,
new Mutations(
DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
mutations));
}
}
} catch (TConstraintViolationException e) {
if (e.getNonViolators().size() > 0) {
sendables.put(e.getCommitSession(),
new Mutations(
DurabilityImpl.resolveDurabilty(sess.durability, tabletDurability),
e.getNonViolators()));
for (Mutation m : e.getNonViolators())
results.add(
new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.ACCEPTED));
}
for (Mutation m : e.getViolators())
results.add(
new TCMResult(((ServerConditionalMutation) m).getID(), TCMStatus.VIOLATED));
}
}
}
long t2 = System.currentTimeMillis();
updateAvgPrepTime(t2 - t1, es.size());
} finally {
prepSpan.stop();
}
Span walSpan = Trace.start("wal");
try {
while (true && sendables.size() > 0) {
try {
long t1 = System.currentTimeMillis();
logger.logManyTablets(sendables);
long t2 = System.currentTimeMillis();
updateWalogWriteTime(t2 - t1);
break;
} catch (IOException ex) {
log.warn("logging mutations failed, retrying");
} catch (FSError ex) { // happens when DFS is localFS
log.warn("logging mutations failed, retrying");
} catch (Throwable t) {
log.error("Unknown exception logging mutations, counts for"
+ " mutations in flight not decremented!", t);
throw new RuntimeException(t);
}
}
} finally {
walSpan.stop();
}
Span commitSpan = Trace.start("commit");
try {
long t1 = System.currentTimeMillis();
for (Entry<CommitSession,Mutations> entry : sendables.entrySet()) {
CommitSession commitSession = entry.getKey();
List<Mutation> mutations = entry.getValue().getMutations();
commitSession.commit(mutations);
}
long t2 = System.currentTimeMillis();
updateAvgCommitTime(t2 - t1, sendables.size());
} finally {
commitSpan.stop();
}
}
private Map<KeyExtent,List<ServerConditionalMutation>> conditionalUpdate(ConditionalSession cs,
Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results,
List<String> symbols) throws IOException {
// sort each list of mutations, this is done to avoid deadlock and doing seeks in order is
// more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
Map<KeyExtent,List<ServerConditionalMutation>> deferred = new HashMap<>();
// can not process two mutations for the same row, because one will not see what the other
// writes
ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
// get as many locks as possible w/o blocking... defer any rows that are locked
List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
try {
Span checkSpan = Trace.start("Check conditions");
try {
checkConditions(updates, results, cs, symbols);
} finally {
checkSpan.stop();
}
Span updateSpan = Trace.start("apply conditional mutations");
try {
writeConditionalMutations(updates, results, cs);
} finally {
updateSpan.stop();
}
} finally {
rowLocks.releaseRowLocks(locks);
}
return deferred;
}
@Override
public TConditionalSession startConditionalUpdate(TInfo tinfo, TCredentials credentials,
List<ByteBuffer> authorizations, String tableId, TDurability tdurabilty,
String classLoaderContext) throws ThriftSecurityException, TException {
Authorizations userauths = null;
String namespaceId;
try {
namespaceId = Tables.getNamespaceId(getInstance(), tableId);
} catch (TableNotFoundException e) {
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.TABLE_DOESNT_EXIST);
}
if (!security.canConditionallyUpdate(credentials, tableId, namespaceId, authorizations))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
userauths = security.getUserAuthorizations(credentials);
for (ByteBuffer auth : authorizations)
if (!userauths.contains(ByteBufferUtil.toBytes(auth)))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.BAD_AUTHORIZATIONS);
ConditionalSession cs =
new ConditionalSession(credentials, new Authorizations(authorizations), tableId,
DurabilityImpl.fromThrift(tdurabilty), classLoaderContext);
long sid = sessionManager.createSession(cs, false);
return new TConditionalSession(sid, lockID, sessionManager.getMaxIdleTime());
}
@Override
public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String> symbols)
throws NoSuchScanIDException, TException {
ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID);
if (cs == null || cs.interruptFlag.get())
throw new NoSuchScanIDException();
if (!cs.tableId.equals(MetadataTable.ID) && !cs.tableId.equals(RootTable.ID)) {
try {
TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
} catch (HoldTimeoutException hte) {
// Assumption is that the client has timed out and is gone. If thats not the case throw an
// exception that will cause it to retry.
log.debug("HoldTimeoutException during conditionalUpdate, reporting no such session");
throw new NoSuchScanIDException();
}
}
String tid = cs.tableId;
long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid, null, null)));
try {
Map<KeyExtent,List<ServerConditionalMutation>> updates = Translator.translate(mutations,
Translators.TKET, new Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
for (KeyExtent ke : updates.keySet())
if (!ke.getTableId().equals(tid))
throw new IllegalArgumentException(
"Unexpected table id " + tid + " != " + ke.getTableId());
ArrayList<TCMResult> results = new ArrayList<>();
Map<KeyExtent,List<ServerConditionalMutation>> deferred =
conditionalUpdate(cs, updates, results, symbols);
while (deferred.size() > 0) {
deferred = conditionalUpdate(cs, deferred, results, symbols);
}
return results;
} catch (IOException ioe) {
throw new TException(ioe);
} finally {
writeTracker.finishWrite(opid);
sessionManager.unreserveSession(sessID);
}
}
@Override
public void invalidateConditionalUpdate(TInfo tinfo, long sessID) throws TException {
// this method should wait for any running conditional update to complete
// after this method returns a conditional update should not be able to start
ConditionalSession cs = (ConditionalSession) sessionManager.getSession(sessID);
if (cs != null)
cs.interruptFlag.set(true);
cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
if (cs != null)
sessionManager.removeSession(sessID, true);
}
@Override
public void closeConditionalUpdate(TInfo tinfo, long sessID) throws TException {
sessionManager.removeSession(sessID, false);
}
@Override
public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent tkeyExtent,
ByteBuffer splitPoint) throws NotServingTabletException, ThriftSecurityException {
String tableId = new String(ByteBufferUtil.toBytes(tkeyExtent.table));
String namespaceId;
try {
namespaceId = Tables.getNamespaceId(getInstance(), tableId);
} catch (TableNotFoundException ex) {
// tableOperationsImpl catches ThriftSeccurityException and checks for missing table
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.TABLE_DOESNT_EXIST);
}
if (!security.canSplitTablet(credentials, tableId, namespaceId))
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
KeyExtent keyExtent = new KeyExtent(tkeyExtent);
Tablet tablet = onlineTablets.get(keyExtent);
if (tablet == null) {
throw new NotServingTabletException(tkeyExtent);
}
if (keyExtent.getEndRow() == null
|| !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint))) {
try {
if (TabletServer.this.splitTablet(tablet, ByteBufferUtil.toBytes(splitPoint)) == null) {
throw new NotServingTabletException(tkeyExtent);
}
} catch (IOException e) {
log.warn("Failed to split " + keyExtent, e);
throw new RuntimeException(e);
}
}
}
@Override
public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException, TException {
return getStats(sessionManager.getActiveScansPerTable());
}
@Override
public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials credentials, String tableId)
throws ThriftSecurityException, TException {
TreeMap<KeyExtent,Tablet> onlineTabletsCopy;
synchronized (onlineTablets) {
onlineTabletsCopy = new TreeMap<>(onlineTablets);
}
List<TabletStats> result = new ArrayList<>();
String text = tableId;
KeyExtent start = new KeyExtent(text, new Text(), null);
for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.tailMap(start).entrySet()) {
KeyExtent ke = entry.getKey();
if (ke.getTableId().compareTo(text) == 0) {
Tablet tablet = entry.getValue();
TabletStats stats = tablet.getTabletStats();
stats.extent = ke.toThrift();
stats.ingestRate = tablet.ingestRate();
stats.queryRate = tablet.queryRate();
stats.splitCreationTime = tablet.getSplitCreationTime();
stats.numEntries = tablet.getNumEntries();
result.add(stats);
}
}
return result;
}
private void checkPermission(TCredentials credentials, String lock, final String request)
throws ThriftSecurityException {
try {
log.trace("Got " + request + " message from user: " + credentials.getPrincipal());
if (!security.canPerformSystemActions(credentials)) {
log.warn("Got " + request + " message from user: " + credentials.getPrincipal());
throw new ThriftSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED);
}
} catch (ThriftSecurityException e) {
log.warn("Got " + request + " message from unauthenticatable user: " + e.getUser());
if (getCredentials().getToken().getClass().getName()
.equals(credentials.getTokenClassName())) {
log.error("Got message from a service with a mismatched configuration."
+ " Please ensure a compatible configuration.", e);
}
throw e;
}
if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
log.debug("Got " + request + " message before my lock was acquired, ignoring...");
throw new RuntimeException("Lock not acquired");
}
if (tabletServerLock != null && tabletServerLock.wasLockAcquired()
&& !tabletServerLock.isLocked()) {
Halt.halt(1, new Runnable() {
@Override
public void run() {
log.info("Tablet server no longer holds lock during checkPermission() : " + request
+ ", exiting");
gcLogger.logGCInfo(TabletServer.this.getConfiguration());
}
});
}
if (lock != null) {
ZooUtil.LockID lid =
new ZooUtil.LockID(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_LOCK, lock);
try {
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
// maybe the cache is out of date and a new master holds the
// lock?
masterLockCache.clear();
if (!ZooLock.isLockHeld(masterLockCache, lid)) {
log.warn("Got " + request
+ " message from a master that does not hold the current lock " + lock);
throw new RuntimeException("bad master lock");
}
}
} catch (Exception e) {
throw new RuntimeException("bad master lock", e);
}
}
}
@Override
public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
final TKeyExtent textent) {
try {
checkPermission(credentials, lock, "loadTablet");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to load a tablet", e);
throw new RuntimeException(e);
}
final KeyExtent extent = new KeyExtent(textent);
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
// checking if this exact tablet is in any of the sets
// below is not a strong enough check
// when splits and fix splits occurring
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
Set<KeyExtent> all = new HashSet<>();
all.addAll(unopenedOverlapping);
all.addAll(openingOverlapping);
all.addAll(onlineOverlapping);
if (!all.isEmpty()) {
// ignore any tablets that have recently split, for error logging
for (KeyExtent e2 : onlineOverlapping) {
Tablet tablet = onlineTablets.get(e2);
if (System.currentTimeMillis() - tablet.getSplitCreationTime()
< RECENTLY_SPLIT_MILLIES) {
all.remove(e2);
}
}
// ignore self, for error logging
all.remove(extent);
if (all.size() > 0) {
log.error(
"Tablet " + extent + " overlaps previously assigned " + unopenedOverlapping
+ " " + openingOverlapping + " " + onlineOverlapping + " " + all);
}
return;
}
unopenedTablets.add(extent);
}
}
}
// add the assignment job to the appropriate queue
log.info("Loading tablet " + extent);
final AssignmentHandler ah = new AssignmentHandler(extent);
// final Runnable ah = new LoggingRunnable(log, );
// Root tablet assignment must take place immediately
if (extent.isRootTablet()) {
new Daemon("Root Tablet Assignment") {
@Override
public void run() {
ah.run();
if (onlineTablets.containsKey(extent)) {
log.info("Root tablet loaded: " + extent);
} else {
log.info("Root tablet failed to load");
}
}
}.start();
} else {
if (extent.isMeta()) {
resourceManager.addMetaDataAssignment(extent, log, ah);
} else {
resourceManager.addAssignment(extent, log, ah);
}
}
}
@Override
public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent,
TUnloadTabletGoal goal, long requestTime) {
try {
checkPermission(credentials, lock, "unloadTablet");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to unload a tablet", e);
throw new RuntimeException(e);
}
KeyExtent extent = new KeyExtent(textent);
resourceManager.addMigration(extent,
new LoggingRunnable(log, new UnloadTabletHandler(extent, goal, requestTime)));
}
@Override
public void flush(TInfo tinfo, TCredentials credentials, String lock, String tableId,
ByteBuffer startRow, ByteBuffer endRow) {
try {
checkPermission(credentials, lock, "flush");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to flush a table", e);
throw new RuntimeException(e);
}
ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
KeyExtent ke =
new KeyExtent(tableId, ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToFlush.add(tablet);
}
Long flushID = null;
for (Tablet tablet : tabletsToFlush) {
if (flushID == null) {
// read the flush id once from zookeeper instead of reading
// it for each tablet
try {
flushID = tablet.getFlushID();
} catch (NoNodeException e) {
// table was probably deleted
log.info("Asked to flush table that has no flush id {} {}", ke, e.getMessage());
return;
}
}
tablet.flush(flushID);
}
}
@Override
public void flushTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent)
throws TException {
try {
checkPermission(credentials, lock, "flushTablet");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to flush a tablet", e);
throw new RuntimeException(e);
}
Tablet tablet = onlineTablets.get(new KeyExtent(textent));
if (tablet != null) {
log.info("Flushing " + tablet.getExtent());
try {
tablet.flush(tablet.getFlushID());
} catch (NoNodeException nne) {
log.info("Asked to flush tablet that has no flush id {} {}", new KeyExtent(textent),
nne.getMessage());
}
}
}
@Override
public void halt(TInfo tinfo, TCredentials credentials, String lock)
throws ThriftSecurityException {
checkPermission(credentials, lock, "halt");
Halt.halt(0, new Runnable() {
@Override
public void run() {
log.info("Master requested tablet server halt");
gcLogger.logGCInfo(TabletServer.this.getConfiguration());
serverStopRequested = true;
try {
tabletServerLock.unlock();
} catch (Exception e) {
log.error("Caught exception unlocking TabletServer lock", e);
}
}
});
}
@Override
public void fastHalt(TInfo info, TCredentials credentials, String lock) {
try {
halt(info, credentials, lock);
} catch (Exception e) {
log.warn("Error halting", e);
}
}
@Override
public TabletStats getHistoricalStats(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException, TException {
return statsKeeper.getTabletStats();
}
@Override
public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException, TException {
try {
checkPermission(credentials, null, "getScans");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to get active scans", e);
throw e;
}
return sessionManager.getActiveScans();
}
@Override
public void chop(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent)
throws TException {
try {
checkPermission(credentials, lock, "chop");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to chop extent", e);
throw new RuntimeException(e);
}
KeyExtent ke = new KeyExtent(textent);
Tablet tablet = onlineTablets.get(ke);
if (tablet != null) {
tablet.chopFiles();
}
}
@Override
public void compact(TInfo tinfo, TCredentials credentials, String lock, String tableId,
ByteBuffer startRow, ByteBuffer endRow) throws TException {
try {
checkPermission(credentials, lock, "compact");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to compact a table", e);
throw new RuntimeException(e);
}
KeyExtent ke =
new KeyExtent(tableId, ByteBufferUtil.toText(endRow), ByteBufferUtil.toText(startRow));
ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
synchronized (onlineTablets) {
for (Tablet tablet : onlineTablets.values())
if (ke.overlaps(tablet.getExtent()))
tabletsToCompact.add(tablet);
}
Pair<Long,UserCompactionConfig> compactionInfo = null;
for (Tablet tablet : tabletsToCompact) {
// all for the same table id, so only need to read
// compaction id once
if (compactionInfo == null)
try {
compactionInfo = tablet.getCompactionID();
} catch (NoNodeException e) {
log.info("Asked to compact table with no compaction id {} {}", ke, e.getMessage());
return;
}
tablet.compactAll(compactionInfo.getFirst(), compactionInfo.getSecond());
}
}
@Override
public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials)
throws ThriftSecurityException, TException {
try {
checkPermission(credentials, null, "getActiveCompactions");
} catch (ThriftSecurityException e) {
log.error("Caller doesn't have permission to get active compactions", e);
throw e;
}
List<CompactionInfo> compactions = Compactor.getRunningCompactions();
List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
for (CompactionInfo compactionInfo : compactions) {
ret.add(compactionInfo.toThrift());
}
return ret;
}
@Override
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
String log = logger.getLogFile();
// Might be null if there no active logger
if (null == log) {
return Collections.emptyList();
}
return Collections.singletonList(log);
}
@Override
public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames)
throws TException {
log.warn("Garbage collector is attempting to remove logs through the tablet server");
log.warn("This is probably because your file"
+ " Garbage Collector is an older version than your tablet servers.\n"
+ "Restart your file Garbage Collector.");
}
}
private class SplitRunner implements Runnable {
private final Tablet tablet;
public SplitRunner(Tablet tablet) {
this.tablet = tablet;
}
@Override
public void run() {
splitTablet(tablet);
}
}
public long updateTotalQueuedMutationSize(long additionalMutationSize) {
return totalQueuedMutationSize.addAndGet(additionalMutationSize);
}
public Tablet getOnlineTablet(KeyExtent extent) {
return onlineTablets.get(extent);
}
public Session getSession(long sessionId) {
return sessionManager.getSession(sessionId);
}
public void executeSplit(Tablet tablet) {
resourceManager.executeSplit(tablet.getExtent(),
new LoggingRunnable(log, new SplitRunner(tablet)));
}
private class MajorCompactor implements Runnable {
public MajorCompactor(AccumuloConfiguration config) {
CompactionWatcher.startWatching(config);
}
@Override
public void run() {
while (true) {
try {
sleepUninterruptibly(getConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY),
TimeUnit.MILLISECONDS);
TreeMap<KeyExtent,Tablet> copyOnlineTablets = new TreeMap<>();
synchronized (onlineTablets) {
copyOnlineTablets.putAll(onlineTablets); // avoid
// concurrent
// modification
}
List<DfsLogger> closedCopy;
synchronized (closedLogs) {
closedCopy = copyClosedLogs(closedLogs);
}
int numMajorCompactionsInProgress = 0;
Iterator<Entry<KeyExtent,Tablet>> iter = copyOnlineTablets.entrySet().iterator();
// bail early now if we're shutting down
while (iter.hasNext()) {
Entry<KeyExtent,Tablet> entry = iter.next();
Tablet tablet = entry.getValue();
// if we need to split AND compact, we need a good way
// to decide what to do
if (tablet.needsSplit()) {
executeSplit(tablet);
continue;
}
tablet.checkIfMinorCompactionNeededForLogs(closedCopy);
synchronized (tablet) {
if (tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL)
|| tablet.isMajorCompactionQueued() || tablet.isMajorCompactionRunning()) {
numMajorCompactionsInProgress++;
continue;
}
}
}
int idleCompactionsToStart =
Math.max(1, getConfiguration().getCount(Property.TSERV_MAJC_MAXCONCURRENT) / 2);
if (numMajorCompactionsInProgress < idleCompactionsToStart) {
// system is not major compacting, can schedule some
// idle compactions
iter = copyOnlineTablets.entrySet().iterator();
while (iter.hasNext() && numMajorCompactionsInProgress < idleCompactionsToStart) {
Entry<KeyExtent,Tablet> entry = iter.next();
Tablet tablet = entry.getValue();
if (tablet.initiateMajorCompaction(MajorCompactionReason.IDLE)) {
numMajorCompactionsInProgress++;
}
}
}
} catch (Throwable t) {
log.error("Unexpected exception in " + Thread.currentThread().getName(), t);
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
}
private void splitTablet(Tablet tablet) {
try {
TreeMap<KeyExtent,TabletData> tabletInfo = splitTablet(tablet, null);
if (tabletInfo == null) {
// either split or compact not both
// were not able to split... so see if a major compaction is
// needed
tablet.initiateMajorCompaction(MajorCompactionReason.NORMAL);
}
} catch (IOException e) {
statsKeeper.updateTime(Operation.SPLIT, 0, 0, true);
log.error("split failed: {} for tablet {}", e.getMessage(), tablet.getExtent(), e);
} catch (Exception e) {
statsKeeper.updateTime(Operation.SPLIT, 0, 0, true);
log.error("Unknown error on split: {}", e, e);
}
}
private TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint)
throws IOException {
long t1 = System.currentTimeMillis();
TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint);
if (tabletInfo == null) {
return null;
}
log.info("Starting split: " + tablet.getExtent());
statsKeeper.incrementStatusSplit();
long start = System.currentTimeMillis();
Tablet[] newTablets = new Tablet[2];
Entry<KeyExtent,TabletData> first = tabletInfo.firstEntry();
TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(),
getTableConfiguration(first.getKey()));
newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue());
Entry<KeyExtent,TabletData> last = tabletInfo.lastEntry();
TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(),
getTableConfiguration(last.getKey()));
newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue());
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMajorMinorTimes(tablet.getTabletStats());
// lose the reference to the old tablet and open two new ones
synchronized (onlineTablets) {
onlineTablets.remove(tablet.getExtent());
onlineTablets.put(newTablets[0].getExtent(), newTablets[0]);
onlineTablets.put(newTablets[1].getExtent(), newTablets[1]);
}
// tell the master
enqueueMasterMessage(new SplitReportMessage(tablet.getExtent(), newTablets[0].getExtent(),
new Text("/" + newTablets[0].getLocation().getName()), newTablets[1].getExtent(),
new Text("/" + newTablets[1].getLocation().getName())));
statsKeeper.updateTime(Operation.SPLIT, start, 0, false);
long t2 = System.currentTimeMillis();
log.info("Tablet split: " + tablet.getExtent() + " size0 " + newTablets[0].estimateTabletSize()
+ " size1 " + newTablets[1].estimateTabletSize() + " time " + (t2 - t1) + "ms");
return tabletInfo;
}
// add a message for the main thread to send back to the master
public void enqueueMasterMessage(MasterMessage m) {
masterMessages.addLast(m);
}
private class UnloadTabletHandler implements Runnable {
private final KeyExtent extent;
private final TUnloadTabletGoal goalState;
private final long requestTimeSkew;
public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) {
this.extent = extent;
this.goalState = goalState;
this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS);
}
@Override
public void run() {
Tablet t = null;
synchronized (unopenedTablets) {
if (unopenedTablets.contains(extent)) {
unopenedTablets.remove(extent);
// enqueueMasterMessage(new TabletUnloadedMessage(extent));
return;
}
}
synchronized (openingTablets) {
while (openingTablets.contains(extent)) {
try {
log.info("Waiting for tablet {} to finish opening before unloading.", extent);
openingTablets.wait();
} catch (InterruptedException e) {}
}
}
synchronized (onlineTablets) {
if (onlineTablets.containsKey(extent)) {
t = onlineTablets.get(extent);
}
}
if (t == null) {
// Tablet has probably been recently unloaded: repeated master
// unload request is crossing the successful unloaded message
if (!recentlyUnloadedCache.containsKey(extent)) {
log.info("told to unload tablet that was not being served " + extent);
enqueueMasterMessage(
new TabletStatusMessage(TabletLoadState.UNLOAD_FAILURE_NOT_SERVING, extent));
}
return;
}
try {
t.close(!goalState.equals(TUnloadTabletGoal.DELETED));
} catch (Throwable e) {
if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) {
log.debug("Failed to unload tablet {} ... it was already closing or closed : {}", extent,
e.getMessage());
} else {
log.error("Failed to close tablet {}... Aborting migration", extent, e);
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOAD_ERROR, extent));
}
return;
}
// stop serving tablet - client will get not serving tablet
// exceptions
recentlyUnloadedCache.put(extent, System.currentTimeMillis());
onlineTablets.remove(extent);
try {
TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId());
TabletLocationState tls = null;
try {
tls = new TabletLocationState(extent, null, instance, null, null, null, false);
} catch (BadLocationStateException e) {
log.error("Unexpected error ", e);
}
if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet()
|| (extent.isMeta()
&& !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) {
log.debug("Unassigning " + tls);
TabletStateStore.unassign(TabletServer.this, tls, null);
} else {
log.debug("Suspending " + tls);
TabletStateStore.suspend(TabletServer.this, tls, null,
requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS));
}
} catch (DistributedStoreException ex) {
log.warn("Unable to update storage", ex);
} catch (KeeperException e) {
log.warn("Unable determine our zookeeper session information", e);
} catch (InterruptedException e) {
log.warn("Interrupted while getting our zookeeper session information", e);
}
// tell the master how it went
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.UNLOADED, extent));
// roll tablet stats over into tablet server's statsKeeper object as
// historical data
statsKeeper.saveMajorMinorTimes(t.getTabletStats());
log.info("unloaded " + extent);
}
}
protected class AssignmentHandler implements Runnable {
private final KeyExtent extent;
private final int retryAttempt;
public AssignmentHandler(KeyExtent extent) {
this(extent, 0);
}
public AssignmentHandler(KeyExtent extent, int retryAttempt) {
this.extent = extent;
this.retryAttempt = retryAttempt;
}
@Override
public void run() {
log.info(clientAddress + ": got assignment from master: " + extent);
synchronized (unopenedTablets) {
synchronized (openingTablets) {
synchronized (onlineTablets) {
// nothing should be moving between sets, do a sanity
// check
Set<KeyExtent> unopenedOverlapping = KeyExtent.findOverlapping(extent, unopenedTablets);
Set<KeyExtent> openingOverlapping = KeyExtent.findOverlapping(extent, openingTablets);
Set<KeyExtent> onlineOverlapping = KeyExtent.findOverlapping(extent, onlineTablets);
if (openingOverlapping.contains(extent) || onlineOverlapping.contains(extent))
return;
if (!unopenedOverlapping.contains(extent)) {
log.info("assignment " + extent + " no longer in the unopened set");
return;
}
if (unopenedOverlapping.size() != 1 || openingOverlapping.size() > 0
|| onlineOverlapping.size() > 0) {
throw new IllegalStateException(
"overlaps assigned " + extent + " " + !unopenedTablets.contains(extent) + " "
+ unopenedOverlapping + " " + openingOverlapping + " " + onlineOverlapping);
}
}
unopenedTablets.remove(extent);
openingTablets.add(extent);
}
}
log.debug("Loading extent: " + extent);
// check Metadata table before accepting assignment
Text locationToOpen = null;
SortedMap<Key,Value> tabletsKeyValues = new TreeMap<>();
try {
Pair<Text,
KeyExtent> pair = verifyTabletInformation(TabletServer.this, extent,
TabletServer.this.getTabletSession(), tabletsKeyValues, getClientAddressString(),
getLock());
if (pair != null) {
locationToOpen = pair.getFirst();
if (pair.getSecond() != null) {
synchronized (openingTablets) {
openingTablets.remove(extent);
openingTablets.notifyAll();
// it expected that the new extent will overlap the old one... if it does not, it
// should not be added to unopenedTablets
if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(pair.getSecond())))
.contains(pair.getSecond())) {
throw new IllegalStateException(
"Fixed split does not overlap " + extent + " " + pair.getSecond());
}
unopenedTablets.add(pair.getSecond());
}
// split was rolled back... try again
new AssignmentHandler(pair.getSecond()).run();
return;
}
}
} catch (Exception e) {
synchronized (openingTablets) {
openingTablets.remove(extent);
openingTablets.notifyAll();
}
log.warn("Failed to verify tablet " + extent, e);
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
throw new RuntimeException(e);
}
if (locationToOpen == null) {
log.debug("Reporting tablet " + extent
+ " assignment failure: unable to verify Tablet Information");
synchronized (openingTablets) {
openingTablets.remove(extent);
openingTablets.notifyAll();
}
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
return;
}
Tablet tablet = null;
boolean successful = false;
try {
acquireRecoveryMemory(extent);
TabletResourceManager trm =
resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent));
TabletData data;
if (extent.isRootTablet()) {
data = new TabletData(fs, ZooReaderWriter.getInstance(), getTableConfiguration(extent));
} else {
data = new TabletData(extent, fs, tabletsKeyValues.entrySet().iterator());
}
tablet = new Tablet(TabletServer.this, extent, trm, data);
// If a minor compaction starts after a tablet opens, this indicates a log recovery
// occurred. This recovered data must be minor compacted.
// There are three reasons to wait for this minor compaction to finish before placing the
// tablet in online tablets.
//
// 1) The log recovery code does not handle data written to the tablet on multiple tablet
// servers.
// 2) The log recovery code does not block if memory is full. Therefore recovering lots of
// tablets that use a lot of memory could run out of memory.
// 3) The minor compaction finish event did not make it to the logs (the file will be in
// metadata, preventing replay of compacted data)... but do not
// want a majc to wipe the file out from metadata and then have another process failure...
// this could cause duplicate data to replay.
if (tablet.getNumEntriesInMemory() > 0
&& !tablet.minorCompactNow(MinorCompactionReason.RECOVERY)) {
throw new RuntimeException("Minor compaction after recovery fails for " + extent);
}
Assignment assignment = new Assignment(extent, getTabletSession());
TabletStateStore.setLocation(TabletServer.this, assignment);
synchronized (openingTablets) {
synchronized (onlineTablets) {
openingTablets.remove(extent);
onlineTablets.put(extent, tablet);
openingTablets.notifyAll();
recentlyUnloadedCache.remove(tablet.getExtent());
}
}
tablet = null; // release this reference
successful = true;
} catch (Throwable e) {
log.warn("exception trying to assign tablet {} {}", extent, locationToOpen, e);
if (e.getMessage() != null) {
log.warn("{}", e.getMessage());
}
String tableId = extent.getTableId();
ProblemReports.getInstance(TabletServer.this).report(new ProblemReport(tableId, TABLET_LOAD,
extent.getUUID().toString(), getClientAddressString(), e));
} finally {
releaseRecoveryMemory(extent);
}
if (!successful) {
synchronized (unopenedTablets) {
synchronized (openingTablets) {
openingTablets.remove(extent);
unopenedTablets.add(extent);
openingTablets.notifyAll();
}
}
log.warn("failed to open tablet " + extent + " reporting failure to master");
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
long reschedule = Math.min((1l << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000l);
log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
SimpleTimer.getInstance(getConfiguration()).schedule(new TimerTask() {
@Override
public void run() {
log.info("adding tablet " + extent + " back to the assignment pool (retry "
+ retryAttempt + ")");
AssignmentHandler handler = new AssignmentHandler(extent, retryAttempt + 1);
if (extent.isMeta()) {
if (extent.isRootTablet()) {
new Daemon(new LoggingRunnable(log, handler), "Root tablet assignment retry")
.start();
} else {
resourceManager.addMetaDataAssignment(extent, log, handler);
}
} else {
resourceManager.addAssignment(extent, log, handler);
}
}
}, reschedule);
} else {
enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
}
}
}
private void acquireRecoveryMemory(KeyExtent extent) throws InterruptedException {
if (!extent.isMeta()) {
recoveryLock.lock();
}
}
private void releaseRecoveryMemory(KeyExtent extent) {
if (!extent.isMeta()) {
recoveryLock.unlock();
}
}
private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint,
TProcessor processor, String threadName) throws UnknownHostException {
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(this, address, portHint, processor,
this.getClass().getSimpleName(), threadName, Property.TSERV_PORTSEARCH,
Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, maxMessageSizeProperty);
this.server = sp.server;
return sp.address;
}
private HostAndPort getMasterAddress() {
try {
List<String> locations = getInstance().getMasterLocations();
if (locations.size() == 0)
return null;
return HostAndPort.fromString(locations.get(0));
} catch (Exception e) {
log.warn("Failed to obtain master host " + e);
}
return null;
}
// Connect to the master for posting asynchronous results
private MasterClientService.Client masterConnection(HostAndPort address) {
try {
if (address == null) {
return null;
}
MasterClientService.Client client =
ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, this);
// log.info("Listener API to master has been opened");
return client;
} catch (Exception e) {
log.warn("Issue with masterConnection (" + address + ") " + e, e);
}
return null;
}
private void returnMasterConnection(MasterClientService.Client client) {
ThriftUtil.returnClient(client);
}
private HostAndPort startTabletClientService() throws UnknownHostException {
// start listening for client connection last
clientHandler = new ThriftClientHandler();
Iface rpcProxy = RpcWrapper.service(clientHandler, new Processor<Iface>(clientHandler));
final Processor<Iface> processor;
if (ThriftServerType.SASL == getThriftServerType()) {
Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class,
getConfiguration());
processor = new Processor<>(tcredProxy);
} else {
processor = new Processor<>(rpcProxy);
}
HostAndPort address = startServer(getServerConfigurationFactory().getConfiguration(),
clientAddress.getHost(), Property.TSERV_CLIENTPORT, processor, "Thrift Client Server");
log.info("address = " + address);
return address;
}
private HostAndPort startReplicationService() throws UnknownHostException {
final ReplicationServicerHandler handler = new ReplicationServicerHandler(this);
ReplicationServicer.Iface rpcProxy = RpcWrapper.service(handler,
new ReplicationServicer.Processor<ReplicationServicer.Iface>(handler));
ReplicationServicer.Iface repl =
TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration());
ReplicationServicer.Processor<ReplicationServicer.Iface> processor =
new ReplicationServicer.Processor<>(repl);
AccumuloConfiguration conf = getServerConfigurationFactory().getConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
ServerAddress sp = TServerUtils.startServer(this, clientAddress.getHost(),
Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler",
"Replication Servicer", null, Property.REPLICATION_MIN_THREADS,
Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
this.replServer = sp.server;
log.info("Started replication service on " + sp.address);
try {
// The replication service is unique to the thrift service for a tserver, not just a host.
// Advertise the host and port for replication service given the host and port for the
// tserver.
ZooReaderWriter.getInstance().putPersistentData(
ZooUtil.getRoot(getInstance()) + ReplicationConstants.ZOO_TSERVERS + "/"
+ clientAddress.toString(),
sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
} catch (Exception e) {
log.error("Could not advertise replication service port", e);
throw new RuntimeException(e);
}
return sp.address;
}
public ZooLock getLock() {
return tabletServerLock;
}
private void announceExistence() {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
try {
String zPath =
ZooUtil.getRoot(getInstance()) + Constants.ZTSERVERS + "/" + getClientAddressString();
try {
zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP);
} catch (KeeperException e) {
if (KeeperException.Code.NOAUTH == e.code()) {
log.error("Failed to write to ZooKeeper. Ensure that"
+ " accumulo-site.xml, specifically instance.secret, is consistent.");
}
throw e;
}
tabletServerLock = new ZooLock(zPath);
LockWatcher lw = new LockWatcher() {
@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(serverStopRequested ? 0 : 1, new Runnable() {
@Override
public void run() {
if (!serverStopRequested)
log.error("Lost tablet server lock (reason = " + reason + "), exiting.");
gcLogger.logGCInfo(getConfiguration());
}
});
}
@Override
public void unableToMonitorLockNode(final Throwable e) {
Halt.halt(1, new Runnable() {
@Override
public void run() {
log.error("Lost ability to monitor tablet server lock, exiting.", e);
}
});
}
};
byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT)
.toString().getBytes(UTF_8);
for (int i = 0; i < 120 / 5; i++) {
zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP);
if (tabletServerLock.tryLock(lw, lockContent)) {
log.debug("Obtained tablet server lock " + tabletServerLock.getLockPath());
lockID = tabletServerLock.getLockID()
.serialize(ZooUtil.getRoot(getInstance()) + Constants.ZTSERVERS + "/");
return;
}
log.info("Waiting for tablet server lock");
sleepUninterruptibly(5, TimeUnit.SECONDS);
}
String msg = "Too many retries, exiting.";
log.info(msg);
throw new RuntimeException(msg);
} catch (Exception e) {
log.info("Could not obtain tablet server lock, exiting.", e);
throw new RuntimeException(e);
}
}
// main loop listens for client requests
@Override
public void run() {
SecurityUtil.serverLogin(SiteConfiguration.getInstance());
// To make things easier on users/devs, and to avoid creating an upgrade path to 1.7
// We can just make the zookeeper paths before we try to use.
try {
ZooKeeperInitialization.ensureZooKeeperInitialized(ZooReaderWriter.getInstance(),
ZooUtil.getRoot(getInstance()));
} catch (KeeperException | InterruptedException e) {
log.error("Could not ensure that ZooKeeper is properly initialized", e);
throw new RuntimeException(e);
}
Metrics tserverMetrics = metricsFactory.createTabletServerMetrics(this);
// Register MBeans
try {
tserverMetrics.register();
mincMetrics.register();
scanMetrics.register();
updateMetrics.register();
} catch (Exception e) {
log.error("Error registering with JMX", e);
}
if (null != authKeyWatcher) {
log.info("Seeding ZooKeeper watcher for authentication keys");
try {
authKeyWatcher.updateAuthKeys();
} catch (KeeperException | InterruptedException e) {
// TODO Does there need to be a better check? What are the error conditions that we'd fall
// out here? AUTH_FAILURE?
// If we get the error, do we just put it on a timer and retry the exists(String, Watcher)
// call?
log.error("Failed to perform initial check for authentication tokens in"
+ " ZooKeeper. Delegation token authentication will be unavailable.", e);
}
}
try {
clientAddress = startTabletClientService();
} catch (UnknownHostException e1) {
throw new RuntimeException("Failed to start the tablet client service", e1);
}
announceExistence();
try {
walMarker.initWalMarker(getTabletSession());
} catch (Exception e) {
log.error("Unable to create WAL marker node in zookeeper", e);
throw new RuntimeException(e);
}
ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(
getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
bulkFailedCopyQ = new DistributedWorkQueue(
ZooUtil.getRoot(getInstance()) + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
try {
bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
} catch (Exception e1) {
throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
}
try {
logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
} catch (Exception ex) {
log.error("Error setting watches for recoveries");
throw new RuntimeException(ex);
}
// Start the thrift service listening for incoming replication requests
try {
replicationAddress = startReplicationService();
} catch (UnknownHostException e) {
throw new RuntimeException("Failed to start replication service", e);
}
// Start the pool to handle outgoing replications
final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(
getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
replWorker.setExecutor(replicationThreadPool);
replWorker.run();
// Check the configuration value for the size of the pool and, if changed, resize the pool,
// every 5 seconds);
final AccumuloConfiguration aconf = getConfiguration();
Runnable replicationWorkThreadPoolResizer = new Runnable() {
@Override
public void run() {
int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS);
if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) {
log.info("Resizing thread pool for sending replication work from "
+ replicationThreadPool.getMaximumPoolSize() + " to " + maxPoolSize);
replicationThreadPool.setMaximumPoolSize(maxPoolSize);
}
}
};
SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
SimpleTimer.getInstance(aconf).schedule(new BulkImportCacheCleaner(this),
CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS);
HostAndPort masterHost;
while (!serverStopRequested) {
// send all of the pending messages
try {
MasterMessage mm = null;
MasterClientService.Client iface = null;
try {
// wait until a message is ready to send, or a sever stop
// was requested
while (mm == null && !serverStopRequested) {
mm = masterMessages.poll(1000, TimeUnit.MILLISECONDS);
}
// have a message to send to the master, so grab a
// connection
masterHost = getMasterAddress();
iface = masterConnection(masterHost);
TServiceClient client = iface;
// if while loop does not execute at all and mm != null,
// then finally block should place mm back on queue
while (!serverStopRequested && mm != null && client != null
&& client.getOutputProtocol() != null
&& client.getOutputProtocol().getTransport() != null
&& client.getOutputProtocol().getTransport().isOpen()) {
try {
mm.send(rpcCreds(), getClientAddressString(), iface);
mm = null;
} catch (TException ex) {
log.warn("Error sending message: queuing message again");
masterMessages.putFirst(mm);
mm = null;
throw ex;
}
// if any messages are immediately available grab em and
// send them
mm = masterMessages.poll();
}
} finally {
if (mm != null) {
masterMessages.putFirst(mm);
}
returnMasterConnection(iface);
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
serverStopRequested = true;
} catch (Exception e) {
// may have lost connection with master
// loop back to the beginning and wait for a new one
// this way we survive master failures
log.error(getClientAddressString() + ": TServerInfo: Exception. Master down?", e);
}
}
// wait for shutdown
// if the main thread exits oldServer the master listener, the JVM will
// kill the other threads and finalize objects. We want the shutdown that is
// running in the master listener thread to complete oldServer this happens.
// consider making other threads daemon threads so that objects don't
// get prematurely finalized
synchronized (this) {
while (shutdownComplete == false) {
try {
this.wait(1000);
} catch (InterruptedException e) {
log.error(e.toString());
}
}
}
log.debug("Stopping Replication Server");
TServerUtils.stopTServer(this.replServer);
log.debug("Stopping Thrift Servers");
TServerUtils.stopTServer(server);
try {
log.debug("Closing filesystem");
fs.close();
} catch (IOException e) {
log.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
gcLogger.logGCInfo(getConfiguration());
log.info("TServerInfo: stop requested. exiting ... ");
try {
tabletServerLock.unlock();
} catch (Exception e) {
log.warn("Failed to release tablet server lock", e);
}
}
private static Pair<Text,KeyExtent> verifyRootTablet(KeyExtent extent, TServerInstance instance)
throws DistributedStoreException, AccumuloException {
ZooTabletStateStore store = new ZooTabletStateStore();
if (!store.iterator().hasNext()) {
throw new AccumuloException("Illegal state: location is not set in zookeeper");
}
TabletLocationState next = store.iterator().next();
if (!instance.equals(next.future)) {
throw new AccumuloException("Future location is not to this server for the root tablet");
}
if (next.current != null) {
throw new AccumuloException("Root tablet already has a location set");
}
try {
return new Pair<>(new Text(MetadataTableUtil.getRootTabletDir()), null);
} catch (IOException e) {
throw new AccumuloException(e);
}
}
public static Pair<Text,KeyExtent> verifyTabletInformation(AccumuloServerContext context,
KeyExtent extent, TServerInstance instance, SortedMap<Key,Value> tabletsKeyValues,
String clientAddress, ZooLock lock)
throws AccumuloSecurityException, DistributedStoreException, AccumuloException {
log.debug("verifying extent " + extent);
if (extent.isRootTablet()) {
return verifyRootTablet(extent, instance);
}
String tableToVerify = MetadataTable.ID;
if (extent.isMeta())
tableToVerify = RootTable.ID;
List<ColumnFQ> columnsToFetch =
Arrays.asList(new ColumnFQ[] {TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN,
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN,
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN,
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN,
TabletsSection.ServerColumnFamily.TIME_COLUMN});
TreeMap<Key,Value> tkv = new TreeMap<>();
try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) {
scanner.setRange(extent.toMetadataRange());
for (Entry<Key,Value> entry : scanner)
tkv.put(entry.getKey(), entry.getValue());
}
// only populate map after success
if (tabletsKeyValues == null) {
tabletsKeyValues = tkv;
} else {
tabletsKeyValues.clear();
tabletsKeyValues.putAll(tkv);
}
Text metadataEntry = extent.getMetadataEntry();
Value dir = checkTabletMetadata(extent, instance, tabletsKeyValues, metadataEntry);
if (dir == null)
return null;
Value oldPrevEndRow = null;
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
if (TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.hasColumns(entry.getKey())) {
oldPrevEndRow = entry.getValue();
}
}
if (oldPrevEndRow != null) {
SortedMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries;
tabletEntries = MetadataTableUtil.getTabletEntries(tabletsKeyValues, columnsToFetch);
KeyExtent fke;
try {
fke = MasterMetadataUtil.fixSplit(context, metadataEntry, tabletEntries.get(metadataEntry),
instance, lock);
} catch (IOException e) {
log.error("Error fixing split " + metadataEntry);
throw new AccumuloException(e.toString());
}
if (!fke.equals(extent)) {
return new Pair<>(null, fke);
}
// reread and reverify metadata entries now that metadata entries were fixed
tabletsKeyValues.clear();
return verifyTabletInformation(context, fke, instance, tabletsKeyValues, clientAddress, lock);
}
return new Pair<>(new Text(dir.get()), null);
}
static Value checkTabletMetadata(KeyExtent extent, TServerInstance instance,
SortedMap<Key,Value> tabletsKeyValues, Text metadataEntry) throws AccumuloException {
TServerInstance future = null;
Value prevEndRow = null;
Value dir = null;
Value time = null;
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
Key key = entry.getKey();
if (!metadataEntry.equals(key.getRow())) {
log.info("Unexpected row in tablet metadata " + metadataEntry + " " + key.getRow());
return null;
}
Text cf = key.getColumnFamily();
if (cf.equals(TabletsSection.FutureLocationColumnFamily.NAME)) {
if (future != null) {
throw new AccumuloException("Tablet has multiple future locations " + extent);
}
future = new TServerInstance(entry.getValue(), key.getColumnQualifier());
} else if (cf.equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
log.info("Tablet seems to be already assigned to "
+ new TServerInstance(entry.getValue(), key.getColumnQualifier()));
return null;
} else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
prevEndRow = entry.getValue();
} else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
dir = entry.getValue();
} else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) {
time = entry.getValue();
}
}
if (prevEndRow == null) {
throw new AccumuloException("Metadata entry does not have prev row (" + metadataEntry + ")");
} else {
KeyExtent ke2 = new KeyExtent(metadataEntry, prevEndRow);
if (!extent.equals(ke2)) {
log.info("Tablet prev end row mismatch " + extent + " " + ke2.getPrevEndRow());
return null;
}
}
if (dir == null) {
throw new AccumuloException("Metadata entry does not have directory (" + metadataEntry + ")");
}
if (time == null && !extent.equals(RootTable.OLD_EXTENT)) {
throw new AccumuloException("Metadata entry does not have time (" + metadataEntry + ")");
}
if (future == null) {
log.info("The master has not assigned " + extent + " to " + instance);
return null;
}
if (!instance.equals(future)) {
log.info("Table " + extent + " has been assigned to " + future + " which is not " + instance);
return null;
}
return dir;
}
public String getClientAddressString() {
if (clientAddress == null)
return null;
return clientAddress.getHost() + ":" + clientAddress.getPort();
}
public String getReplicationAddressSTring() {
if (null == replicationAddress) {
return null;
}
return replicationAddress.getHost() + ":" + replicationAddress.getPort();
}
public TServerInstance getTabletSession() {
String address = getClientAddressString();
if (address == null)
return null;
try {
return new TServerInstance(address, tabletServerLock.getSessionId());
} catch (Exception ex) {
log.warn("Unable to read session from tablet server lock" + ex);
return null;
}
}
public void config(String hostname) {
log.info("Tablet server starting on " + hostname);
majorCompactorThread =
new Daemon(new LoggingRunnable(log, new MajorCompactor(getConfiguration())));
majorCompactorThread.setName("Split/MajC initiator");
majorCompactorThread.start();
clientAddress = HostAndPort.fromParts(hostname, 0);
try {
AccumuloVFSClassLoader.getContextManager()
.setContextConfig(new ContextManager.DefaultContextsConfig() {
@Override
public Map<String,String> getVfsContextClasspathProperties() {
return getConfiguration()
.getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
// A task that cleans up unused classloader contexts
Runnable contextCleaner = new Runnable() {
@Override
public void run() {
Set<String> contextProperties = getServerConfigurationFactory().getConfiguration()
.getAllPropertiesWithPrefix(Property.VFS_CONTEXT_CLASSPATH_PROPERTY).keySet();
Set<String> configuredContexts = new HashSet<>();
for (String prop : contextProperties) {
configuredContexts
.add(prop.substring(Property.VFS_CONTEXT_CLASSPATH_PROPERTY.name().length()));
}
try {
AccumuloVFSClassLoader.getContextManager().removeUnusedContexts(configuredContexts);
} catch (IOException e) {
log.warn("{}", e.getMessage(), e);
}
}
};
AccumuloConfiguration aconf = getConfiguration();
SimpleTimer.getInstance(aconf).schedule(contextCleaner, 60000, 60000);
FileSystemMonitor.start(aconf, Property.TSERV_MONITOR_FS);
Runnable gcDebugTask = new Runnable() {
@Override
public void run() {
gcLogger.logGCInfo(getConfiguration());
}
};
SimpleTimer.getInstance(aconf).schedule(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS);
Runnable constraintTask = new Runnable() {
@Override
public void run() {
ArrayList<Tablet> tablets;
synchronized (onlineTablets) {
tablets = new ArrayList<>(onlineTablets.values());
}
for (Tablet tablet : tablets) {
tablet.checkConstraints();
}
}
};
SimpleTimer.getInstance(aconf).schedule(constraintTask, 0, 1000);
}
public TabletServerStatus getStats(Map<String,MapCounter<ScanRunState>> scanCounts) {
long start = System.currentTimeMillis();
TabletServerStatus result = new TabletServerStatus();
Map<KeyExtent,Tablet> onlineTabletsCopy;
synchronized (this.onlineTablets) {
onlineTabletsCopy = new HashMap<>(this.onlineTablets);
}
Map<String,TableInfo> tables = new HashMap<>();
for (Entry<KeyExtent,Tablet> entry : onlineTabletsCopy.entrySet()) {
String tableId = entry.getKey().getTableId();
TableInfo table = tables.get(tableId);
if (table == null) {
table = new TableInfo();
table.minors = new Compacting();
table.majors = new Compacting();
tables.put(tableId, table);
}
Tablet tablet = entry.getValue();
long recs = tablet.getNumEntries();
table.tablets++;
table.onlineTablets++;
table.recs += recs;
table.queryRate += tablet.queryRate();
table.queryByteRate += tablet.queryByteRate();
table.ingestRate += tablet.ingestRate();
table.ingestByteRate += tablet.ingestByteRate();
table.scanRate += tablet.scanRate();
long recsInMemory = tablet.getNumEntriesInMemory();
table.recsInMemory += recsInMemory;
if (tablet.isMinorCompactionRunning())
table.minors.running++;
if (tablet.isMinorCompactionQueued())
table.minors.queued++;
if (tablet.isMajorCompactionRunning())
table.majors.running++;
if (tablet.isMajorCompactionQueued())
table.majors.queued++;
}
for (Entry<String,MapCounter<ScanRunState>> entry : scanCounts.entrySet()) {
TableInfo table = tables.get(entry.getKey());
if (table == null) {
table = new TableInfo();
tables.put(entry.getKey(), table);
}
if (table.scans == null)
table.scans = new Compacting();
table.scans.queued += entry.getValue().get(ScanRunState.QUEUED);
table.scans.running += entry.getValue().get(ScanRunState.RUNNING);
}
ArrayList<KeyExtent> offlineTabletsCopy = new ArrayList<>();
synchronized (this.unopenedTablets) {
synchronized (this.openingTablets) {
offlineTabletsCopy.addAll(this.unopenedTablets);
offlineTabletsCopy.addAll(this.openingTablets);
}
}
for (KeyExtent extent : offlineTabletsCopy) {
String tableId = extent.getTableId();
TableInfo table = tables.get(tableId);
if (table == null) {
table = new TableInfo();
tables.put(tableId, table);
}
table.tablets++;
}
result.lastContact = RelativeTime.currentTimeMillis();
result.tableMap = tables;
result.osLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage();
result.name = getClientAddressString();
result.holdTime = resourceManager.holdTime();
result.lookups = seekCount.get();
result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount();
result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount();
result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount();
result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount();
result.logSorts = logSorter.getLogSorts();
result.flushs = flushCounter.get();
result.syncs = syncCounter.get();
result.bulkImports = new ArrayList<>();
result.bulkImports.addAll(clientHandler.getBulkLoadStatus());
result.bulkImports.addAll(bulkImportStatus.getBulkLoadStatus());
result.responseTime = System.currentTimeMillis() - start;
return result;
}
public static void main(String[] args) throws IOException {
try {
final String app = "tserver";
Accumulo.setupLogging(app);
SecurityUtil.serverLogin(SiteConfiguration.getInstance());
ServerOpts opts = new ServerOpts();
opts.parseArgs(app, args);
String hostname = opts.getAddress();
ServerConfigurationFactory conf =
new ServerConfigurationFactory(HdfsZooInstance.getInstance());
VolumeManager fs = VolumeManagerImpl.get();
MetricsSystemHelper.configure(TabletServer.class.getSimpleName());
Accumulo.init(fs, conf, app);
final TabletServer server = new TabletServer(conf, fs);
server.config(hostname);
DistributedTrace.enable(hostname, app, conf.getConfiguration());
if (UserGroupInformation.isSecurityEnabled()) {
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
loginUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() {
server.run();
return null;
}
});
} else {
server.run();
}
} catch (Exception ex) {
log.error("Uncaught exception in TabletServer.main, exiting", ex);
System.exit(1);
} finally {
DistributedTrace.disable();
}
}
private Durability getMincEventDurability(KeyExtent extent) {
TableConfiguration conf;
if (extent.isMeta()) {
conf = confFactory.getTableConfiguration(RootTable.ID);
} else {
conf = confFactory.getTableConfiguration(MetadataTable.ID);
}
Durability durability = DurabilityImpl.fromString(conf.get(Property.TABLE_DURABILITY));
return durability;
}
public void minorCompactionFinished(CommitSession tablet, String newDatafile, long walogSeq)
throws IOException {
Durability durability = getMincEventDurability(tablet.getExtent());
totalMinorCompactions.incrementAndGet();
logger.minorCompactionFinished(tablet, newDatafile, walogSeq, durability);
markUnusedWALs();
}
public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence,
String newMapfileLocation) throws IOException {
Durability durability = getMincEventDurability(tablet.getExtent());
logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation, durability);
}
public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf,
List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver)
throws IOException {
List<Path> recoveryLogs = new ArrayList<>();
List<LogEntry> sorted = new ArrayList<>(logEntries);
Collections.sort(sorted, new Comparator<LogEntry>() {
@Override
public int compare(LogEntry e1, LogEntry e2) {
return (int) (e1.timestamp - e2.timestamp);
}
});
for (LogEntry entry : sorted) {
Path recovery = null;
Path finished =
RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename));
finished = SortedLogState.getFinishedMarkerPath(finished);
TabletServer.log.debug("Looking for " + finished);
if (fs.exists(finished)) {
recovery = finished.getParent();
}
if (recovery == null)
throw new IOException(
"Unable to find recovery files for extent " + extent + " logEntry: " + entry);
recoveryLogs.add(recovery);
}
logger.recover(fs, extent, tconf, recoveryLogs, tabletFiles, mutationReceiver);
}
public int createLogId() {
int logId = logIdGenerator.incrementAndGet();
if (logId < 0) {
throw new IllegalStateException("Log Id rolled");
}
return logId;
}
public TableConfiguration getTableConfiguration(KeyExtent extent) {
return confFactory.getTableConfiguration(extent.getTableId());
}
public DfsLogger.ServerResources getServerConfig() {
return new DfsLogger.ServerResources() {
@Override
public VolumeManager getFileSystem() {
return fs;
}
@Override
public AccumuloConfiguration getConfiguration() {
return TabletServer.this.getConfiguration();
}
};
}
public Collection<Tablet> getOnlineTablets() {
synchronized (onlineTablets) {
return new ArrayList<>(onlineTablets.values());
}
}
public VolumeManager getFileSystem() {
return fs;
}
public int getOpeningCount() {
return openingTablets.size();
}
public int getUnopenedCount() {
return unopenedTablets.size();
}
public long getTotalMinorCompactions() {
return totalMinorCompactions.get();
}
public double getHoldTimeMillis() {
return resourceManager.holdTime();
}
public SecurityOperation getSecurityOperation() {
return security;
}
// avoid unnecessary redundant markings to meta
final ConcurrentHashMap<DfsLogger,EnumSet<TabletLevel>> metadataTableLogs =
new ConcurrentHashMap<>();
final Object levelLocks[] = new Object[TabletLevel.values().length];
{
for (int i = 0; i < levelLocks.length; i++) {
levelLocks[i] = new Object();
}
}
// This is a set of WALs that are closed but may still be referenced by tablets. A LinkedHashSet
// is used because its very import to know the order in which WALs were closed when deciding if a
// WAL is eligible for removal. Maintaining the order that logs were used in is currently a simple
// task because there is only one active log at a time.
LinkedHashSet<DfsLogger> closedLogs = new LinkedHashSet<>();
@VisibleForTesting
interface ReferencedRemover {
void removeInUse(Set<DfsLogger> candidates);
}
/**
* For a closed WAL to be eligible for removal it must be unreferenced AND all closed WALs older
* than it must be unreferenced. This method finds WALs that meet those conditions. See Github
* issue #537.
*/
@VisibleForTesting
static Set<DfsLogger> findOldestUnreferencedWals(List<DfsLogger> closedLogs,
ReferencedRemover referencedRemover) {
LinkedHashSet<DfsLogger> unreferenced = new LinkedHashSet<>(closedLogs);
referencedRemover.removeInUse(unreferenced);
Iterator<DfsLogger> closedIter = closedLogs.iterator();
Iterator<DfsLogger> unrefIter = unreferenced.iterator();
Set<DfsLogger> eligible = new HashSet<>();
while (closedIter.hasNext() && unrefIter.hasNext()) {
DfsLogger closed = closedIter.next();
DfsLogger unref = unrefIter.next();
if (closed.equals(unref)) {
eligible.add(unref);
} else {
break;
}
}
return eligible;
}
@VisibleForTesting
static List<DfsLogger> copyClosedLogs(LinkedHashSet<DfsLogger> closedLogs) {
List<DfsLogger> closedCopy = new ArrayList<>(closedLogs.size());
for (DfsLogger dfsLogger : closedLogs) {
// very important this copy maintains same order ..
closedCopy.add(dfsLogger);
}
return Collections.unmodifiableList(closedCopy);
}
private void markUnusedWALs() {
List<DfsLogger> closedCopy;
synchronized (closedLogs) {
closedCopy = copyClosedLogs(closedLogs);
}
ReferencedRemover refRemover = new ReferencedRemover() {
@Override
public void removeInUse(Set<DfsLogger> candidates) {
for (Tablet tablet : getOnlineTablets()) {
tablet.removeInUseLogs(candidates);
if (candidates.isEmpty()) {
break;
}
}
}
};
Set<DfsLogger> eligible = findOldestUnreferencedWals(closedCopy, refRemover);
try {
TServerInstance session = this.getTabletSession();
for (DfsLogger candidate : eligible) {
log.info("Marking " + candidate.getPath() + " as unreferenced");
walMarker.walUnreferenced(session, candidate.getPath());
}
synchronized (closedLogs) {
closedLogs.removeAll(eligible);
}
} catch (WalMarkerException ex) {
log.info(ex.toString(), ex);
}
}
public void addNewLogMarker(DfsLogger copy) throws WalMarkerException {
log.info("Writing log marker for " + copy.getPath());
walMarker.addNewWalMarker(getTabletSession(), copy.getPath());
}
public void walogClosed(DfsLogger currentLog) throws WalMarkerException {
metadataTableLogs.remove(currentLog);
if (currentLog.getWrites() > 0) {
int clSize;
synchronized (closedLogs) {
closedLogs.add(currentLog);
clSize = closedLogs.size();
}
log.info("Marking " + currentLog.getPath() + " as closed. Total closed logs " + clSize);
walMarker.closeWal(getTabletSession(), currentLog.getPath());
} else {
log.info(
"Marking " + currentLog.getPath() + " as unreferenced (skipping closed writes == 0)");
walMarker.walUnreferenced(getTabletSession(), currentLog.getPath());
}
}
public void updateBulkImportState(List<String> files, BulkImportState state) {
bulkImportStatus.updateBulkImportStatus(files, state);
}
public void removeBulkImportState(List<String> files) {
bulkImportStatus.removeBulkImportStatus(files);
}
private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
private final RateProvider rateProvider = new RateProvider() {
@Override
public long getDesiredRate() {
return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT);
}
};
/**
* Get the {@link RateLimiter} for reads during major compactions on this tserver. All writes
* performed during major compactions are throttled to conform to this RateLimiter.
*/
public final RateLimiter getMajorCompactionReadLimiter() {
return SharedRateLimiterFactory.getInstance().create(MAJC_READ_LIMITER_KEY, rateProvider);
}
/**
* Get the RateLimiter for writes during major compations on this tserver. All reads performed
* during major compactions are throttled to conform to this RateLimiter.
*/
public final RateLimiter getMajorCompactionWriteLimiter() {
return SharedRateLimiterFactory.getInstance().create(MAJC_WRITE_LIMITER_KEY, rateProvider);
}
}