| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.tserver; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; |
| |
| import java.io.IOException; |
| import java.lang.management.ManagementFactory; |
| import java.net.UnknownHostException; |
| import java.security.SecureRandom; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.BlockingDeque; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.Durability; |
| import org.apache.accumulo.core.clientImpl.DurabilityImpl; |
| import org.apache.accumulo.core.clientImpl.TabletLocator; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.dataImpl.KeyExtent; |
| import org.apache.accumulo.core.master.thrift.BulkImportState; |
| import org.apache.accumulo.core.master.thrift.Compacting; |
| import org.apache.accumulo.core.master.thrift.MasterClientService; |
| import org.apache.accumulo.core.master.thrift.TableInfo; |
| import org.apache.accumulo.core.master.thrift.TabletServerStatus; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.RootTable; |
| import org.apache.accumulo.core.metadata.TServerInstance; |
| import org.apache.accumulo.core.metadata.schema.TabletMetadata; |
| import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; |
| import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; |
| import org.apache.accumulo.core.replication.ReplicationConstants; |
| import org.apache.accumulo.core.replication.thrift.ReplicationServicer; |
| import org.apache.accumulo.core.rpc.ThriftUtil; |
| import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; |
| import org.apache.accumulo.core.tabletserver.log.LogEntry; |
| import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface; |
| import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor; |
| import org.apache.accumulo.core.trace.TraceUtil; |
| import org.apache.accumulo.core.util.ComparablePair; |
| import org.apache.accumulo.core.util.Halt; |
| import org.apache.accumulo.core.util.HostAndPort; |
| import org.apache.accumulo.core.util.MapCounter; |
| import org.apache.accumulo.core.util.Pair; |
| import org.apache.accumulo.core.util.ServerServices; |
| import org.apache.accumulo.core.util.ServerServices.Service; |
| import org.apache.accumulo.core.util.threads.ThreadPools; |
| import org.apache.accumulo.core.util.threads.Threads; |
| import org.apache.accumulo.fate.util.Retry; |
| import org.apache.accumulo.fate.util.Retry.RetryFactory; |
| import org.apache.accumulo.fate.util.UtilWaitThread; |
| import org.apache.accumulo.fate.zookeeper.ZooCache; |
| import org.apache.accumulo.fate.zookeeper.ZooLock; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason; |
| import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher; |
| import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; |
| import org.apache.accumulo.server.AbstractServer; |
| import org.apache.accumulo.server.GarbageCollectionLogger; |
| import org.apache.accumulo.server.ServerConstants; |
| import org.apache.accumulo.server.ServerContext; |
| import org.apache.accumulo.server.ServerOpts; |
| import org.apache.accumulo.server.TabletLevel; |
| import org.apache.accumulo.server.conf.TableConfiguration; |
| import org.apache.accumulo.server.fs.VolumeChooserEnvironmentImpl; |
| import org.apache.accumulo.server.fs.VolumeManager; |
| import org.apache.accumulo.server.log.SortedLogState; |
| import org.apache.accumulo.server.log.WalStateManager; |
| import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; |
| import org.apache.accumulo.server.manager.recovery.RecoveryPath; |
| import org.apache.accumulo.server.replication.ZooKeeperInitialization; |
| import org.apache.accumulo.server.rpc.ServerAddress; |
| import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper; |
| import org.apache.accumulo.server.rpc.TServerUtils; |
| import org.apache.accumulo.server.rpc.ThriftServerType; |
| import org.apache.accumulo.server.security.AuditedSecurityOperation; |
| import org.apache.accumulo.server.security.SecurityOperation; |
| import org.apache.accumulo.server.security.SecurityUtil; |
| import org.apache.accumulo.server.security.delegation.AuthenticationTokenSecretManager; |
| import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatcher; |
| import org.apache.accumulo.server.util.FileSystemMonitor; |
| import org.apache.accumulo.server.util.ServerBulkImportStatus; |
| import org.apache.accumulo.server.util.time.RelativeTime; |
| import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; |
| import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; |
| import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; |
| import org.apache.accumulo.tserver.compactions.Compactable; |
| import org.apache.accumulo.tserver.compactions.CompactionManager; |
| import org.apache.accumulo.tserver.log.DfsLogger; |
| import org.apache.accumulo.tserver.log.LogSorter; |
| import org.apache.accumulo.tserver.log.MutationReceiver; |
| import org.apache.accumulo.tserver.log.TabletServerLogger; |
| import org.apache.accumulo.tserver.mastermessage.MasterMessage; |
| import org.apache.accumulo.tserver.mastermessage.SplitReportMessage; |
| import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics; |
| import org.apache.accumulo.tserver.metrics.TabletServerMetrics; |
| import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; |
| import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; |
| import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; |
| import org.apache.accumulo.tserver.replication.ReplicationServicerHandler; |
| import org.apache.accumulo.tserver.replication.ReplicationWorker; |
| import org.apache.accumulo.tserver.scan.ScanRunState; |
| import org.apache.accumulo.tserver.session.Session; |
| import org.apache.accumulo.tserver.session.SessionManager; |
| import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner; |
| import org.apache.accumulo.tserver.tablet.CommitSession; |
| import org.apache.accumulo.tserver.tablet.CompactionWatcher; |
| import org.apache.accumulo.tserver.tablet.Tablet; |
| import org.apache.accumulo.tserver.tablet.TabletData; |
| import org.apache.commons.collections4.map.LRUMap; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.metrics2.MetricsSystem; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.TProcessor; |
| import org.apache.thrift.TServiceClient; |
| import org.apache.thrift.server.TServer; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Iterators; |
| |
| public class TabletServer extends AbstractServer { |
| |
| private static final Logger log = LoggerFactory.getLogger(TabletServer.class); |
| private static final long TIME_BETWEEN_GC_CHECKS = 5000; |
| private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000; |
| |
| final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); |
| final ZooCache masterLockCache; |
| |
| final TabletServerLogger logger; |
| |
| final TabletServerUpdateMetrics updateMetrics; |
| final TabletServerScanMetrics scanMetrics; |
| final TabletServerMinCMetrics mincMetrics; |
| final CompactionExecutorsMetrics ceMetrics; |
| |
| public TabletServerScanMetrics getScanMetrics() { |
| return scanMetrics; |
| } |
| |
| public TabletServerMinCMetrics getMinCMetrics() { |
| return mincMetrics; |
| } |
| |
| private final LogSorter logSorter; |
| private ReplicationWorker replWorker = null; |
| final TabletStatsKeeper statsKeeper; |
| private final AtomicInteger logIdGenerator = new AtomicInteger(); |
| |
| private final AtomicLong flushCounter = new AtomicLong(0); |
| private final AtomicLong syncCounter = new AtomicLong(0); |
| |
| final OnlineTablets onlineTablets = new OnlineTablets(); |
| final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>()); |
| final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>()); |
| final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap<>(1000)); |
| |
| final TabletServerResourceManager resourceManager; |
| private final SecurityOperation security; |
| |
| private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<>(); |
| |
| HostAndPort clientAddress; |
| |
| private volatile boolean serverStopRequested = false; |
| private volatile boolean shutdownComplete = false; |
| |
| private ZooLock tabletServerLock; |
| |
| private TServer server; |
| private volatile TServer replServer; |
| |
| private DistributedWorkQueue bulkFailedCopyQ; |
| |
| private String lockID; |
| |
| public static final AtomicLong seekCount = new AtomicLong(0); |
| |
| private final AtomicLong totalMinorCompactions = new AtomicLong(0); |
| |
| private final ZooAuthenticationKeyWatcher authKeyWatcher; |
| private final WalStateManager walMarker; |
| |
| public static void main(String[] args) throws Exception { |
| try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) { |
| tserver.runServer(); |
| } |
| } |
| |
| TabletServer(ServerOpts opts, String[] args) { |
| super("tserver", opts, args); |
| ServerContext context = super.getContext(); |
| context.setupCrypto(); |
| this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null); |
| final AccumuloConfiguration aconf = getConfiguration(); |
| log.info("Version " + Constants.VERSION); |
| log.info("Instance " + getInstanceID()); |
| this.sessionManager = new SessionManager(aconf); |
| this.logSorter = new LogSorter(context, aconf); |
| this.replWorker = new ReplicationWorker(context); |
| this.statsKeeper = new TabletStatsKeeper(); |
| final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT); |
| final long logBusyTabletsDelay = |
| aconf.getTimeInMillis(Property.TSERV_LOG_BUSY_TABLETS_INTERVAL); |
| |
| // check early whether the WAL directory supports sync. issue warning if |
| // it doesn't |
| checkWalCanSync(context); |
| |
| // This thread will calculate and log out the busiest tablets based on ingest count and |
| // query count every #{logBusiestTabletsDelay} |
| if (numBusyTabletsToLog > 0) { |
| ThreadPools.createGeneralScheduledExecutorService(aconf) |
| .scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() { |
| private BusiestTracker ingestTracker = |
| BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog); |
| private BusiestTracker queryTracker = |
| BusiestTracker.newBusiestQueryTracker(numBusyTabletsToLog); |
| |
| @Override |
| public void run() { |
| Collection<Tablet> tablets = onlineTablets.snapshot().values(); |
| logBusyTablets(ingestTracker.computeBusiest(tablets), "ingest count"); |
| logBusyTablets(queryTracker.computeBusiest(tablets), "query count"); |
| } |
| |
| private void logBusyTablets(List<ComparablePair<Long,KeyExtent>> busyTablets, |
| String label) { |
| |
| int i = 1; |
| for (Pair<Long,KeyExtent> pair : busyTablets) { |
| log.debug("{} busiest tablet by {}: {} -- extent: {} ", i, label.toLowerCase(), |
| pair.getFirst(), pair.getSecond()); |
| i++; |
| } |
| } |
| }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS); |
| } |
| |
| ThreadPools.createGeneralScheduledExecutorService(aconf) |
| .scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() { |
| @Override |
| public void run() { |
| long now = System.currentTimeMillis(); |
| for (Tablet tablet : getOnlineTablets().values()) { |
| try { |
| tablet.updateRates(now); |
| } catch (Exception ex) { |
| log.error("Error updating rates for {}", tablet.getExtent(), ex); |
| } |
| } |
| } |
| }), 5000, 5000, TimeUnit.MILLISECONDS); |
| |
| final long walogMaxSize = aconf.getAsBytes(Property.TSERV_WALOG_MAX_SIZE); |
| final long walogMaxAge = aconf.getTimeInMillis(Property.TSERV_WALOG_MAX_AGE); |
| final long minBlockSize = |
| context.getHadoopConf().getLong("dfs.namenode.fs-limits.min-block-size", 0); |
| if (minBlockSize != 0 && minBlockSize > walogMaxSize) { |
| throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " |
| + walogMaxSize + " but hdfs minimum block size is " + minBlockSize |
| + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE |
| + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); |
| } |
| |
| final long toleratedWalCreationFailures = |
| aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES); |
| final long walFailureRetryIncrement = |
| aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); |
| final long walFailureRetryMax = |
| aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); |
| final RetryFactory walCreationRetryFactory = |
| Retry.builder().maxRetries(toleratedWalCreationFailures) |
| .retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS) |
| .incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS) |
| .maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).backOffFactor(1.5) |
| .logInterval(3, TimeUnit.MINUTES).createFactory(); |
| // Tolerate infinite failures for the write, however backing off the same as for creation |
| // failures. |
| final RetryFactory walWritingRetryFactory = Retry.builder().infiniteRetries() |
| .retryAfter(walFailureRetryIncrement, TimeUnit.MILLISECONDS) |
| .incrementBy(walFailureRetryIncrement, TimeUnit.MILLISECONDS) |
| .maxWait(walFailureRetryMax, TimeUnit.MILLISECONDS).backOffFactor(1.5) |
| .logInterval(3, TimeUnit.MINUTES).createFactory(); |
| |
| logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, |
| walCreationRetryFactory, walWritingRetryFactory, walogMaxAge); |
| this.resourceManager = new TabletServerResourceManager(context); |
| this.security = AuditedSecurityOperation.getInstance(context); |
| |
| updateMetrics = new TabletServerUpdateMetrics(); |
| scanMetrics = new TabletServerScanMetrics(); |
| mincMetrics = new TabletServerMinCMetrics(); |
| ceMetrics = new CompactionExecutorsMetrics(); |
| ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( |
| TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS); |
| walMarker = new WalStateManager(context); |
| |
| // Create the secret manager |
| context.setSecretManager(new AuthenticationTokenSecretManager(context.getInstanceID(), |
| aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME))); |
| if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { |
| log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys"); |
| // Watcher to notice new AuthenticationKeys which enable delegation tokens |
| authKeyWatcher = |
| new ZooAuthenticationKeyWatcher(context.getSecretManager(), context.getZooReaderWriter(), |
| context.getZooKeeperRoot() + Constants.ZDELEGATION_TOKEN_KEYS); |
| } else { |
| authKeyWatcher = null; |
| } |
| config(); |
| } |
| |
| public String getInstanceID() { |
| return getContext().getInstanceID(); |
| } |
| |
| public String getVersion() { |
| return Constants.VERSION; |
| } |
| |
| private static long jitter() { |
| Random r = new SecureRandom(); |
| // add a random 10% wait |
| return (long) ((1. + (r.nextDouble() / 10)) * TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS); |
| } |
| |
| final SessionManager sessionManager; |
| |
| private final AtomicLong totalQueuedMutationSize = new AtomicLong(0); |
| private final ReentrantLock recoveryLock = new ReentrantLock(true); |
| private ThriftClientHandler clientHandler; |
| private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); |
| private CompactionManager compactionManager; |
| |
| String getLockID() { |
| return lockID; |
| } |
| |
| void requestStop() { |
| serverStopRequested = true; |
| } |
| |
| private class SplitRunner implements Runnable { |
| private final Tablet tablet; |
| |
| public SplitRunner(Tablet tablet) { |
| this.tablet = tablet; |
| } |
| |
| @Override |
| public void run() { |
| splitTablet(tablet); |
| } |
| } |
| |
| public long updateTotalQueuedMutationSize(long additionalMutationSize) { |
| return totalQueuedMutationSize.addAndGet(additionalMutationSize); |
| } |
| |
| public Session getSession(long sessionId) { |
| return sessionManager.getSession(sessionId); |
| } |
| |
| public void executeSplit(Tablet tablet) { |
| resourceManager.executeSplit(tablet.getExtent(), new SplitRunner(tablet)); |
| } |
| |
| private class MajorCompactor implements Runnable { |
| |
| public MajorCompactor(AccumuloConfiguration config) { |
| CompactionWatcher.startWatching(config); |
| } |
| |
| @Override |
| public void run() { |
| while (true) { |
| try { |
| sleepUninterruptibly(getConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY), |
| TimeUnit.MILLISECONDS); |
| |
| List<DfsLogger> closedCopy; |
| |
| synchronized (closedLogs) { |
| closedCopy = copyClosedLogs(closedLogs); |
| } |
| |
| // bail early now if we're shutting down |
| for (Entry<KeyExtent,Tablet> entry : getOnlineTablets().entrySet()) { |
| |
| Tablet tablet = entry.getValue(); |
| |
| // if we need to split AND compact, we need a good way |
| // to decide what to do |
| if (tablet.needsSplit()) { |
| executeSplit(tablet); |
| continue; |
| } |
| |
| tablet.checkIfMinorCompactionNeededForLogs(closedCopy); |
| } |
| } catch (Exception t) { |
| log.error("Unexpected exception in {}", Thread.currentThread().getName(), t); |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| } |
| } |
| } |
| |
| private void splitTablet(Tablet tablet) { |
| try { |
| splitTablet(tablet, null); |
| } catch (IOException e) { |
| statsKeeper.updateTime(Operation.SPLIT, 0, true); |
| log.error("split failed: {} for tablet {}", e.getMessage(), tablet.getExtent(), e); |
| } catch (Exception e) { |
| statsKeeper.updateTime(Operation.SPLIT, 0, true); |
| log.error("Unknown error on split:", e); |
| } |
| } |
| |
| TreeMap<KeyExtent,TabletData> splitTablet(Tablet tablet, byte[] splitPoint) throws IOException { |
| long t1 = System.currentTimeMillis(); |
| |
| TreeMap<KeyExtent,TabletData> tabletInfo = tablet.split(splitPoint); |
| if (tabletInfo == null) { |
| return null; |
| } |
| |
| log.info("Starting split: {}", tablet.getExtent()); |
| statsKeeper.incrementStatusSplit(); |
| long start = System.currentTimeMillis(); |
| |
| Tablet[] newTablets = new Tablet[2]; |
| |
| Entry<KeyExtent,TabletData> first = tabletInfo.firstEntry(); |
| TabletResourceManager newTrm0 = resourceManager.createTabletResourceManager(first.getKey(), |
| getTableConfiguration(first.getKey())); |
| newTablets[0] = new Tablet(TabletServer.this, first.getKey(), newTrm0, first.getValue()); |
| |
| Entry<KeyExtent,TabletData> last = tabletInfo.lastEntry(); |
| TabletResourceManager newTrm1 = resourceManager.createTabletResourceManager(last.getKey(), |
| getTableConfiguration(last.getKey())); |
| newTablets[1] = new Tablet(TabletServer.this, last.getKey(), newTrm1, last.getValue()); |
| |
| // roll tablet stats over into tablet server's statsKeeper object as |
| // historical data |
| statsKeeper.saveMajorMinorTimes(tablet.getTabletStats()); |
| |
| // lose the reference to the old tablet and open two new ones |
| onlineTablets.split(tablet.getExtent(), newTablets[0], newTablets[1]); |
| |
| // tell the master |
| enqueueMasterMessage(new SplitReportMessage(tablet.getExtent(), newTablets[0].getExtent(), |
| new Text("/" + newTablets[0].getDirName()), newTablets[1].getExtent(), |
| new Text("/" + newTablets[1].getDirName()))); |
| |
| statsKeeper.updateTime(Operation.SPLIT, start, false); |
| long t2 = System.currentTimeMillis(); |
| log.info("Tablet split: {} size0 {} size1 {} time {}ms", tablet.getExtent(), |
| newTablets[0].estimateTabletSize(), newTablets[1].estimateTabletSize(), (t2 - t1)); |
| |
| return tabletInfo; |
| } |
| |
| // add a message for the main thread to send back to the master |
| public void enqueueMasterMessage(MasterMessage m) { |
| masterMessages.addLast(m); |
| } |
| |
| void acquireRecoveryMemory(KeyExtent extent) { |
| if (!extent.isMeta()) { |
| recoveryLock.lock(); |
| } |
| } |
| |
| void releaseRecoveryMemory(KeyExtent extent) { |
| if (!extent.isMeta()) { |
| recoveryLock.unlock(); |
| } |
| } |
| |
| private HostAndPort startServer(AccumuloConfiguration conf, String address, TProcessor processor) |
| throws UnknownHostException { |
| Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null |
| ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); |
| ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), address, |
| Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), |
| "Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, |
| Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK, maxMessageSizeProperty); |
| this.server = sp.server; |
| return sp.address; |
| } |
| |
| private HostAndPort getMasterAddress() { |
| try { |
| List<String> locations = getContext().getMasterLocations(); |
| if (locations.isEmpty()) { |
| return null; |
| } |
| return HostAndPort.fromString(locations.get(0)); |
| } catch (Exception e) { |
| log.warn("Failed to obtain master host " + e); |
| } |
| |
| return null; |
| } |
| |
| // Connect to the master for posting asynchronous results |
| private MasterClientService.Client masterConnection(HostAndPort address) { |
| try { |
| if (address == null) { |
| return null; |
| } |
| // log.info("Listener API to master has been opened"); |
| return ThriftUtil.getClient(new MasterClientService.Client.Factory(), address, getContext()); |
| } catch (Exception e) { |
| log.warn("Issue with masterConnection (" + address + ") " + e, e); |
| } |
| return null; |
| } |
| |
| private void returnMasterConnection(MasterClientService.Client client) { |
| ThriftUtil.returnClient(client); |
| } |
| |
| private HostAndPort startTabletClientService() throws UnknownHostException { |
| // start listening for client connection last |
| clientHandler = new ThriftClientHandler(this); |
| Iface rpcProxy = TraceUtil.wrapService(clientHandler); |
| final Processor<Iface> processor; |
| if (getContext().getThriftServerType() == ThriftServerType.SASL) { |
| Iface tcredProxy = TCredentialsUpdatingWrapper.service(rpcProxy, ThriftClientHandler.class, |
| getConfiguration()); |
| processor = new Processor<>(tcredProxy); |
| } else { |
| processor = new Processor<>(rpcProxy); |
| } |
| HostAndPort address = startServer(getConfiguration(), clientAddress.getHost(), processor); |
| log.info("address = {}", address); |
| return address; |
| } |
| |
| private void startReplicationService() throws UnknownHostException { |
| final ReplicationServicerHandler handler = new ReplicationServicerHandler(this); |
| ReplicationServicer.Iface rpcProxy = TraceUtil.wrapService(handler); |
| ReplicationServicer.Iface repl = |
| TCredentialsUpdatingWrapper.service(rpcProxy, handler.getClass(), getConfiguration()); |
| ReplicationServicer.Processor<ReplicationServicer.Iface> processor = |
| new ReplicationServicer.Processor<>(repl); |
| Property maxMessageSizeProperty = |
| getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null |
| ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE; |
| ServerAddress sp = |
| TServerUtils.startServer(getMetricsSystem(), getContext(), clientAddress.getHost(), |
| Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler", |
| "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS, |
| null, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); |
| this.replServer = sp.server; |
| log.info("Started replication service on {}", sp.address); |
| |
| try { |
| // The replication service is unique to the thrift service for a tserver, not just a host. |
| // Advertise the host and port for replication service given the host and port for the |
| // tserver. |
| getContext().getZooReaderWriter().putPersistentData( |
| getContext().getZooKeeperRoot() + ReplicationConstants.ZOO_TSERVERS + "/" + clientAddress, |
| sp.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); |
| } catch (Exception e) { |
| log.error("Could not advertise replication service port", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public ZooLock getLock() { |
| return tabletServerLock; |
| } |
| |
| private void announceExistence() { |
| ZooReaderWriter zoo = getContext().getZooReaderWriter(); |
| try { |
| String zPath = |
| getContext().getZooKeeperRoot() + Constants.ZTSERVERS + "/" + getClientAddressString(); |
| |
| try { |
| zoo.putPersistentData(zPath, new byte[] {}, NodeExistsPolicy.SKIP); |
| } catch (KeeperException e) { |
| if (e.code() == KeeperException.Code.NOAUTH) { |
| log.error("Failed to write to ZooKeeper. Ensure that" |
| + " accumulo.properties, specifically instance.secret, is consistent."); |
| } |
| throw e; |
| } |
| |
| tabletServerLock = new ZooLock(getContext().getSiteConfiguration(), zPath, UUID.randomUUID()); |
| |
| LockWatcher lw = new LockWatcher() { |
| |
| @Override |
| public void lostLock(final LockLossReason reason) { |
| Halt.halt(serverStopRequested ? 0 : 1, () -> { |
| if (!serverStopRequested) { |
| log.error("Lost tablet server lock (reason = {}), exiting.", reason); |
| } |
| gcLogger.logGCInfo(getConfiguration()); |
| }); |
| } |
| |
| @Override |
| public void unableToMonitorLockNode(final Exception e) { |
| Halt.halt(1, () -> log.error("Lost ability to monitor tablet server lock, exiting.", e)); |
| |
| } |
| }; |
| |
| byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT) |
| .toString().getBytes(UTF_8); |
| for (int i = 0; i < 120 / 5; i++) { |
| zoo.putPersistentData(zPath, new byte[0], NodeExistsPolicy.SKIP); |
| |
| if (tabletServerLock.tryLock(lw, lockContent)) { |
| log.debug("Obtained tablet server lock {}", tabletServerLock.getLockPath()); |
| lockID = tabletServerLock.getLockID() |
| .serialize(getContext().getZooKeeperRoot() + Constants.ZTSERVERS + "/"); |
| return; |
| } |
| log.info("Waiting for tablet server lock"); |
| sleepUninterruptibly(5, TimeUnit.SECONDS); |
| } |
| String msg = "Too many retries, exiting."; |
| log.info(msg); |
| throw new RuntimeException(msg); |
| } catch (Exception e) { |
| log.info("Could not obtain tablet server lock, exiting.", e); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| // main loop listens for client requests |
| @Override |
| public void run() { |
| SecurityUtil.serverLogin(getConfiguration()); |
| |
| // To make things easier on users/devs, and to avoid creating an upgrade path to 1.7 |
| // We can just make the zookeeper paths before we try to use. |
| try { |
| ZooKeeperInitialization.ensureZooKeeperInitialized(getContext().getZooReaderWriter(), |
| getContext().getZooKeeperRoot()); |
| } catch (KeeperException | InterruptedException e) { |
| log.error("Could not ensure that ZooKeeper is properly initialized", e); |
| throw new RuntimeException(e); |
| } |
| |
| try { |
| MetricsSystem metricsSystem = getMetricsSystem(); |
| new TabletServerMetrics(this).register(metricsSystem); |
| mincMetrics.register(metricsSystem); |
| scanMetrics.register(metricsSystem); |
| updateMetrics.register(metricsSystem); |
| ceMetrics.register(metricsSystem); |
| } catch (Exception e) { |
| log.error("Error registering metrics", e); |
| } |
| |
| if (authKeyWatcher != null) { |
| log.info("Seeding ZooKeeper watcher for authentication keys"); |
| try { |
| authKeyWatcher.updateAuthKeys(); |
| } catch (KeeperException | InterruptedException e) { |
| // TODO Does there need to be a better check? What are the error conditions that we'd fall |
| // out here? AUTH_FAILURE? |
| // If we get the error, do we just put it on a timer and retry the exists(String, Watcher) |
| // call? |
| log.error("Failed to perform initial check for authentication tokens in" |
| + " ZooKeeper. Delegation token authentication will be unavailable.", e); |
| } |
| } |
| |
| this.compactionManager = new CompactionManager(new Iterable<Compactable>() { |
| @Override |
| public Iterator<Compactable> iterator() { |
| return Iterators.transform(onlineTablets.snapshot().values().iterator(), |
| Tablet::asCompactable); |
| } |
| }, getContext(), ceMetrics); |
| compactionManager.start(); |
| |
| try { |
| clientAddress = startTabletClientService(); |
| } catch (UnknownHostException e1) { |
| throw new RuntimeException("Failed to start the tablet client service", e1); |
| } |
| announceExistence(); |
| try { |
| walMarker.initWalMarker(getTabletSession()); |
| } catch (Exception e) { |
| log.error("Unable to create WAL marker node in zookeeper", e); |
| throw new RuntimeException(e); |
| } |
| |
| ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools |
| .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS); |
| |
| bulkFailedCopyQ = new DistributedWorkQueue( |
| getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration()); |
| try { |
| bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()), |
| distWorkQThreadPool); |
| } catch (Exception e1) { |
| throw new RuntimeException("Failed to start distributed work queue for copying ", e1); |
| } |
| |
| try { |
| logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool); |
| } catch (Exception ex) { |
| log.error("Error setting watches for recoveries"); |
| throw new RuntimeException(ex); |
| } |
| final AccumuloConfiguration aconf = getConfiguration(); |
| // if the replication name is ever set, then start replication services |
| ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> { |
| if (this.replServer == null) { |
| if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) { |
| log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services."); |
| setupReplication(aconf); |
| } |
| } |
| }, 0, 5000, TimeUnit.MILLISECONDS); |
| |
| final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000; |
| ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( |
| new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, |
| CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS); |
| |
| HostAndPort masterHost; |
| while (!serverStopRequested) { |
| // send all of the pending messages |
| try { |
| MasterMessage mm = null; |
| MasterClientService.Client iface = null; |
| |
| try { |
| // wait until a message is ready to send, or a sever stop |
| // was requested |
| while (mm == null && !serverStopRequested) { |
| mm = masterMessages.poll(1000, TimeUnit.MILLISECONDS); |
| } |
| |
| // have a message to send to the master, so grab a |
| // connection |
| masterHost = getMasterAddress(); |
| iface = masterConnection(masterHost); |
| TServiceClient client = iface; |
| |
| // if while loop does not execute at all and mm != null, |
| // then finally block should place mm back on queue |
| while (!serverStopRequested && mm != null && client != null |
| && client.getOutputProtocol() != null |
| && client.getOutputProtocol().getTransport() != null |
| && client.getOutputProtocol().getTransport().isOpen()) { |
| try { |
| mm.send(getContext().rpcCreds(), getClientAddressString(), iface); |
| mm = null; |
| } catch (TException ex) { |
| log.warn("Error sending message: queuing message again"); |
| masterMessages.putFirst(mm); |
| mm = null; |
| throw ex; |
| } |
| |
| // if any messages are immediately available grab em and |
| // send them |
| mm = masterMessages.poll(); |
| } |
| |
| } finally { |
| |
| if (mm != null) { |
| masterMessages.putFirst(mm); |
| } |
| returnMasterConnection(iface); |
| |
| sleepUninterruptibly(1, TimeUnit.SECONDS); |
| } |
| } catch (InterruptedException e) { |
| log.info("Interrupt Exception received, shutting down"); |
| serverStopRequested = true; |
| |
| } catch (Exception e) { |
| // may have lost connection with master |
| // loop back to the beginning and wait for a new one |
| // this way we survive master failures |
| log.error(getClientAddressString() + ": TServerInfo: Exception. Master down?", e); |
| } |
| } |
| |
| // wait for shutdown |
| // if the main thread exits oldServer the master listener, the JVM will |
| // kill the other threads and finalize objects. We want the shutdown that is |
| // running in the master listener thread to complete oldServer this happens. |
| // consider making other threads daemon threads so that objects don't |
| // get prematurely finalized |
| synchronized (this) { |
| while (!shutdownComplete) { |
| try { |
| this.wait(1000); |
| } catch (InterruptedException e) { |
| log.error(e.toString()); |
| } |
| } |
| } |
| log.debug("Stopping Replication Server"); |
| TServerUtils.stopTServer(this.replServer); |
| |
| log.debug("Stopping Thrift Servers"); |
| TServerUtils.stopTServer(server); |
| |
| try { |
| log.debug("Closing filesystems"); |
| getVolumeManager().close(); |
| } catch (IOException e) { |
| log.warn("Failed to close filesystem : {}", e.getMessage(), e); |
| } |
| |
| gcLogger.logGCInfo(getConfiguration()); |
| |
| log.info("TServerInfo: stop requested. exiting ... "); |
| |
| try { |
| tabletServerLock.unlock(); |
| } catch (Exception e) { |
| log.warn("Failed to release tablet server lock", e); |
| } |
| } |
| |
| private void setupReplication(AccumuloConfiguration aconf) { |
| // Start the thrift service listening for incoming replication requests |
| try { |
| startReplicationService(); |
| } catch (UnknownHostException e) { |
| throw new RuntimeException("Failed to start replication service", e); |
| } |
| |
| // Start the pool to handle outgoing replications |
| final ThreadPoolExecutor replicationThreadPool = (ThreadPoolExecutor) ThreadPools |
| .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS); |
| replWorker.setExecutor(replicationThreadPool); |
| replWorker.run(); |
| |
| // Check the configuration value for the size of the pool and, if changed, resize the pool |
| Runnable replicationWorkThreadPoolResizer = () -> { |
| int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS); |
| if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) { |
| log.info("Resizing thread pool for sending replication work from {} to {}", |
| replicationThreadPool.getMaximumPoolSize(), maxPoolSize); |
| replicationThreadPool.setMaximumPoolSize(maxPoolSize); |
| } |
| }; |
| ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( |
| replicationWorkThreadPoolResizer, 10000, 30000, TimeUnit.MILLISECONDS); |
| } |
| |
| static boolean checkTabletMetadata(KeyExtent extent, TServerInstance instance, |
| TabletMetadata meta) throws AccumuloException { |
| |
| if (!meta.sawPrevEndRow()) { |
| throw new AccumuloException("Metadata entry does not have prev row (" + meta.getTableId() |
| + " " + meta.getEndRow() + ")"); |
| } |
| |
| if (!extent.equals(meta.getExtent())) { |
| log.info("Tablet extent mismatch {} {}", extent, meta.getExtent()); |
| return false; |
| } |
| |
| if (meta.getDirName() == null) { |
| throw new AccumuloException( |
| "Metadata entry does not have directory (" + meta.getExtent() + ")"); |
| } |
| |
| if (meta.getTime() == null && !extent.equals(RootTable.EXTENT)) { |
| throw new AccumuloException("Metadata entry does not have time (" + meta.getExtent() + ")"); |
| } |
| |
| Location loc = meta.getLocation(); |
| |
| if (loc == null || loc.getType() != LocationType.FUTURE || !instance.equals(loc)) { |
| log.info("Unexpected location {} {}", extent, loc); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| public String getClientAddressString() { |
| if (clientAddress == null) { |
| return null; |
| } |
| return clientAddress.getHost() + ":" + clientAddress.getPort(); |
| } |
| |
| public TServerInstance getTabletSession() { |
| String address = getClientAddressString(); |
| if (address == null) { |
| return null; |
| } |
| |
| try { |
| return new TServerInstance(address, tabletServerLock.getSessionId()); |
| } catch (Exception ex) { |
| log.warn("Unable to read session from tablet server lock" + ex); |
| return null; |
| } |
| } |
| |
| private static void checkWalCanSync(ServerContext context) { |
| VolumeChooserEnvironment chooserEnv = |
| new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.LOGGER, context); |
| Set<String> prefixes; |
| var options = ServerConstants.getBaseUris(context); |
| try { |
| prefixes = context.getVolumeManager().choosable(chooserEnv, options); |
| } catch (RuntimeException e) { |
| log.warn("Unable to determine if WAL directories ({}) support sync or flush. " |
| + "Data loss may occur.", Arrays.asList(options), e); |
| return; |
| } |
| |
| boolean warned = false; |
| for (String prefix : prefixes) { |
| String logPath = prefix + Path.SEPARATOR + ServerConstants.WAL_DIR; |
| if (!context.getVolumeManager().canSyncAndFlush(new Path(logPath))) { |
| // sleep a few seconds in case this is at cluster start...give monitor |
| // time to start so the warning will be more visible |
| if (!warned) { |
| UtilWaitThread.sleep(5000); |
| warned = true; |
| } |
| log.warn("WAL directory ({}) implementation does not support sync or flush." |
| + " Data loss may occur.", logPath); |
| } |
| } |
| } |
| |
| private void config() { |
| log.info("Tablet server starting on {}", getHostname()); |
| Threads.createThread("Split/MajC initiator", new MajorCompactor(getConfiguration())).start(); |
| |
| clientAddress = HostAndPort.fromParts(getHostname(), 0); |
| |
| final AccumuloConfiguration aconf = getConfiguration(); |
| |
| FileSystemMonitor.start(aconf, Property.TSERV_MONITOR_FS); |
| |
| Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration()); |
| |
| ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(gcDebugTask, 0, |
| TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS); |
| } |
| |
| public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) { |
| long start = System.currentTimeMillis(); |
| TabletServerStatus result = new TabletServerStatus(); |
| |
| final Map<String,TableInfo> tables = new HashMap<>(); |
| |
| getOnlineTablets().forEach((ke, tablet) -> { |
| String tableId = ke.tableId().canonical(); |
| TableInfo table = tables.get(tableId); |
| if (table == null) { |
| table = new TableInfo(); |
| table.minors = new Compacting(); |
| table.majors = new Compacting(); |
| tables.put(tableId, table); |
| } |
| long recs = tablet.getNumEntries(); |
| table.tablets++; |
| table.onlineTablets++; |
| table.recs += recs; |
| table.queryRate += tablet.queryRate(); |
| table.queryByteRate += tablet.queryByteRate(); |
| table.ingestRate += tablet.ingestRate(); |
| table.ingestByteRate += tablet.ingestByteRate(); |
| table.scanRate += tablet.scanRate(); |
| long recsInMemory = tablet.getNumEntriesInMemory(); |
| table.recsInMemory += recsInMemory; |
| if (tablet.isMinorCompactionRunning()) { |
| table.minors.running++; |
| } |
| if (tablet.isMinorCompactionQueued()) { |
| table.minors.queued++; |
| } |
| |
| if (tablet.isMajorCompactionRunning()) { |
| table.majors.running++; |
| } |
| |
| if (tablet.isMajorCompactionQueued()) { |
| table.majors.queued++; |
| } |
| |
| }); |
| |
| scanCounts.forEach((tableId, mapCounter) -> { |
| TableInfo table = tables.get(tableId.canonical()); |
| if (table == null) { |
| table = new TableInfo(); |
| tables.put(tableId.canonical(), table); |
| } |
| |
| if (table.scans == null) { |
| table.scans = new Compacting(); |
| } |
| |
| table.scans.queued += mapCounter.getInt(ScanRunState.QUEUED); |
| table.scans.running += mapCounter.getInt(ScanRunState.RUNNING); |
| }); |
| |
| ArrayList<KeyExtent> offlineTabletsCopy = new ArrayList<>(); |
| synchronized (this.unopenedTablets) { |
| synchronized (this.openingTablets) { |
| offlineTabletsCopy.addAll(this.unopenedTablets); |
| offlineTabletsCopy.addAll(this.openingTablets); |
| } |
| } |
| |
| for (KeyExtent extent : offlineTabletsCopy) { |
| String tableId = extent.tableId().canonical(); |
| TableInfo table = tables.get(tableId); |
| if (table == null) { |
| table = new TableInfo(); |
| tables.put(tableId, table); |
| } |
| table.tablets++; |
| } |
| |
| result.lastContact = RelativeTime.currentTimeMillis(); |
| result.tableMap = tables; |
| result.osLoad = ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(); |
| result.name = getClientAddressString(); |
| result.holdTime = resourceManager.holdTime(); |
| result.lookups = seekCount.get(); |
| result.indexCacheHits = resourceManager.getIndexCache().getStats().hitCount(); |
| result.indexCacheRequest = resourceManager.getIndexCache().getStats().requestCount(); |
| result.dataCacheHits = resourceManager.getDataCache().getStats().hitCount(); |
| result.dataCacheRequest = resourceManager.getDataCache().getStats().requestCount(); |
| result.logSorts = logSorter.getLogSorts(); |
| result.flushs = flushCounter.get(); |
| result.syncs = syncCounter.get(); |
| result.bulkImports = new ArrayList<>(); |
| result.bulkImports.addAll(clientHandler.getBulkLoadStatus()); |
| result.bulkImports.addAll(bulkImportStatus.getBulkLoadStatus()); |
| result.version = getVersion(); |
| result.responseTime = System.currentTimeMillis() - start; |
| return result; |
| } |
| |
| private Durability getMincEventDurability(KeyExtent extent) { |
| TableConfiguration conf; |
| if (extent.isMeta()) { |
| conf = getContext().getTableConfiguration(RootTable.ID); |
| } else { |
| conf = getContext().getTableConfiguration(MetadataTable.ID); |
| } |
| return DurabilityImpl.fromString(conf.get(Property.TABLE_DURABILITY)); |
| } |
| |
| public void minorCompactionFinished(CommitSession tablet, long walogSeq) throws IOException { |
| Durability durability = getMincEventDurability(tablet.getExtent()); |
| totalMinorCompactions.incrementAndGet(); |
| logger.minorCompactionFinished(tablet, walogSeq, durability); |
| markUnusedWALs(); |
| } |
| |
| public void minorCompactionStarted(CommitSession tablet, long lastUpdateSequence, |
| String newMapfileLocation) throws IOException { |
| Durability durability = getMincEventDurability(tablet.getExtent()); |
| logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation, durability); |
| } |
| |
| public void recover(VolumeManager fs, KeyExtent extent, List<LogEntry> logEntries, |
| Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException { |
| List<Path> recoveryLogs = new ArrayList<>(); |
| List<LogEntry> sorted = new ArrayList<>(logEntries); |
| sorted.sort((e1, e2) -> (int) (e1.timestamp - e2.timestamp)); |
| for (LogEntry entry : sorted) { |
| Path recovery = null; |
| Path finished = RecoveryPath.getRecoveryPath(new Path(entry.filename)); |
| finished = SortedLogState.getFinishedMarkerPath(finished); |
| TabletServer.log.debug("Looking for " + finished); |
| if (fs.exists(finished)) { |
| recovery = finished.getParent(); |
| } |
| if (recovery == null) { |
| throw new IOException( |
| "Unable to find recovery files for extent " + extent + " logEntry: " + entry); |
| } |
| recoveryLogs.add(recovery); |
| } |
| logger.recover(fs, extent, recoveryLogs, tabletFiles, mutationReceiver); |
| } |
| |
| public int createLogId() { |
| int logId = logIdGenerator.incrementAndGet(); |
| if (logId < 0) { |
| throw new IllegalStateException("Log Id rolled"); |
| } |
| return logId; |
| } |
| |
| public TableConfiguration getTableConfiguration(KeyExtent extent) { |
| return getContext().getTableConfiguration(extent.tableId()); |
| } |
| |
| public DfsLogger.ServerResources getServerConfig() { |
| return new DfsLogger.ServerResources() { |
| |
| @Override |
| public VolumeManager getVolumeManager() { |
| return TabletServer.this.getVolumeManager(); |
| } |
| |
| @Override |
| public AccumuloConfiguration getConfiguration() { |
| return TabletServer.this.getConfiguration(); |
| } |
| }; |
| } |
| |
| public SortedMap<KeyExtent,Tablet> getOnlineTablets() { |
| return onlineTablets.snapshot(); |
| } |
| |
| public Tablet getOnlineTablet(KeyExtent extent) { |
| return onlineTablets.snapshot().get(extent); |
| } |
| |
| public VolumeManager getVolumeManager() { |
| return getContext().getVolumeManager(); |
| } |
| |
| public int getOpeningCount() { |
| return openingTablets.size(); |
| } |
| |
| public int getUnopenedCount() { |
| return unopenedTablets.size(); |
| } |
| |
| public long getTotalMinorCompactions() { |
| return totalMinorCompactions.get(); |
| } |
| |
| public double getHoldTimeMillis() { |
| return resourceManager.holdTime(); |
| } |
| |
| public SecurityOperation getSecurityOperation() { |
| return security; |
| } |
| |
| // avoid unnecessary redundant markings to meta |
| final ConcurrentHashMap<DfsLogger,EnumSet<TabletLevel>> metadataTableLogs = |
| new ConcurrentHashMap<>(); |
| |
| // This is a set of WALs that are closed but may still be referenced by tablets. A LinkedHashSet |
| // is used because its very import to know the order in which WALs were closed when deciding if a |
| // WAL is eligible for removal. Maintaining the order that logs were used in is currently a simple |
| // task because there is only one active log at a time. |
| LinkedHashSet<DfsLogger> closedLogs = new LinkedHashSet<>(); |
| |
| @VisibleForTesting |
| interface ReferencedRemover { |
| void removeInUse(Set<DfsLogger> candidates); |
| } |
| |
| /** |
| * For a closed WAL to be eligible for removal it must be unreferenced AND all closed WALs older |
| * than it must be unreferenced. This method finds WALs that meet those conditions. See Github |
| * issue #537. |
| */ |
| @VisibleForTesting |
| static Set<DfsLogger> findOldestUnreferencedWals(List<DfsLogger> closedLogs, |
| ReferencedRemover referencedRemover) { |
| LinkedHashSet<DfsLogger> unreferenced = new LinkedHashSet<>(closedLogs); |
| |
| referencedRemover.removeInUse(unreferenced); |
| |
| Iterator<DfsLogger> closedIter = closedLogs.iterator(); |
| Iterator<DfsLogger> unrefIter = unreferenced.iterator(); |
| |
| Set<DfsLogger> eligible = new HashSet<>(); |
| |
| while (closedIter.hasNext() && unrefIter.hasNext()) { |
| DfsLogger closed = closedIter.next(); |
| DfsLogger unref = unrefIter.next(); |
| |
| if (closed.equals(unref)) { |
| eligible.add(unref); |
| } else { |
| break; |
| } |
| } |
| |
| return eligible; |
| } |
| |
| @VisibleForTesting |
| static List<DfsLogger> copyClosedLogs(LinkedHashSet<DfsLogger> closedLogs) { |
| List<DfsLogger> closedCopy = new ArrayList<>(closedLogs.size()); |
| for (DfsLogger dfsLogger : closedLogs) { |
| // very important this copy maintains same order .. |
| closedCopy.add(dfsLogger); |
| } |
| return Collections.unmodifiableList(closedCopy); |
| } |
| |
| private void markUnusedWALs() { |
| |
| List<DfsLogger> closedCopy; |
| |
| synchronized (closedLogs) { |
| closedCopy = copyClosedLogs(closedLogs); |
| } |
| |
| ReferencedRemover refRemover = candidates -> { |
| for (Tablet tablet : getOnlineTablets().values()) { |
| tablet.removeInUseLogs(candidates); |
| if (candidates.isEmpty()) { |
| break; |
| } |
| } |
| }; |
| |
| Set<DfsLogger> eligible = findOldestUnreferencedWals(closedCopy, refRemover); |
| |
| try { |
| TServerInstance session = this.getTabletSession(); |
| for (DfsLogger candidate : eligible) { |
| log.info("Marking " + candidate.getPath() + " as unreferenced"); |
| walMarker.walUnreferenced(session, candidate.getPath()); |
| } |
| synchronized (closedLogs) { |
| closedLogs.removeAll(eligible); |
| } |
| } catch (WalMarkerException ex) { |
| log.info(ex.toString(), ex); |
| } |
| } |
| |
| public void addNewLogMarker(DfsLogger copy) throws WalMarkerException { |
| log.info("Writing log marker for " + copy.getPath()); |
| walMarker.addNewWalMarker(getTabletSession(), copy.getPath()); |
| } |
| |
| public void walogClosed(DfsLogger currentLog) throws WalMarkerException { |
| metadataTableLogs.remove(currentLog); |
| |
| if (currentLog.getWrites() > 0) { |
| int clSize; |
| synchronized (closedLogs) { |
| closedLogs.add(currentLog); |
| clSize = closedLogs.size(); |
| } |
| log.info("Marking " + currentLog.getPath() + " as closed. Total closed logs " + clSize); |
| walMarker.closeWal(getTabletSession(), currentLog.getPath()); |
| } else { |
| log.info( |
| "Marking " + currentLog.getPath() + " as unreferenced (skipping closed writes == 0)"); |
| walMarker.walUnreferenced(getTabletSession(), currentLog.getPath()); |
| } |
| } |
| |
| public void updateBulkImportState(List<String> files, BulkImportState state) { |
| bulkImportStatus.updateBulkImportStatus(files, state); |
| } |
| |
| public void removeBulkImportState(List<String> files) { |
| bulkImportStatus.removeBulkImportStatus(files); |
| } |
| |
| public CompactionManager getCompactionManager() { |
| return compactionManager; |
| } |
| } |