| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.coordinator; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Ordering; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.inject.Inject; |
| import it.unimi.dsi.fastutil.objects.Object2IntMap; |
| import it.unimi.dsi.fastutil.objects.Object2IntMaps; |
| import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; |
| import it.unimi.dsi.fastutil.objects.Object2LongMap; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.utils.ZKPaths; |
| import org.apache.druid.client.DataSourcesSnapshot; |
| import org.apache.druid.client.DruidDataSource; |
| import org.apache.druid.client.DruidServer; |
| import org.apache.druid.client.ImmutableDruidDataSource; |
| import org.apache.druid.client.ImmutableDruidServer; |
| import org.apache.druid.client.ServerInventoryView; |
| import org.apache.druid.client.coordinator.Coordinator; |
| import org.apache.druid.client.indexing.IndexingServiceClient; |
| import org.apache.druid.common.config.JacksonConfigManager; |
| import org.apache.druid.curator.discovery.ServiceAnnouncer; |
| import org.apache.druid.discovery.DruidLeaderSelector; |
| import org.apache.druid.guice.ManageLifecycle; |
| import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty; |
| import org.apache.druid.guice.annotations.Self; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.Pair; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; |
| import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStart; |
| import org.apache.druid.java.util.common.lifecycle.LifecycleStop; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.metadata.MetadataRuleManager; |
| import org.apache.druid.metadata.SegmentsMetadataManager; |
| import org.apache.druid.server.DruidNode; |
| import org.apache.druid.server.coordinator.duty.BalanceSegments; |
| import org.apache.druid.server.coordinator.duty.CompactSegments; |
| import org.apache.druid.server.coordinator.duty.CoordinatorDuty; |
| import org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics; |
| import org.apache.druid.server.coordinator.duty.LogUsedSegments; |
| import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments; |
| import org.apache.druid.server.coordinator.duty.RunRules; |
| import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments; |
| import org.apache.druid.server.coordinator.rules.LoadRule; |
| import org.apache.druid.server.coordinator.rules.Rule; |
| import org.apache.druid.server.initialization.ZkPathsConfig; |
| import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; |
| import org.apache.druid.timeline.DataSegment; |
| import org.apache.druid.timeline.SegmentId; |
| import org.joda.time.DateTime; |
| import org.joda.time.Duration; |
| |
| import javax.annotation.Nullable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.stream.Collectors; |
| |
| /** |
| * |
| */ |
| @ManageLifecycle |
| public class DruidCoordinator |
| { |
| /** |
| * This comparator orders "freshest" segments first, i. e. segments with most recent intervals. |
| * |
| * It is used in historical nodes' {@link LoadQueuePeon}s to make historicals load more recent segment first. |
| * |
| * It is also used in {@link DruidCoordinatorRuntimeParams} for {@link |
| * DruidCoordinatorRuntimeParams#getUsedSegments()} - a collection of segments to be considered during some |
| * coordinator run for different {@link CoordinatorDuty}s. The order matters only for {@link |
| * RunRules}, which tries to apply the rules while iterating the segments in the order imposed by |
| * this comparator. In {@link LoadRule} the throttling limit may be hit (via {@link ReplicationThrottler}; see |
| * {@link CoordinatorDynamicConfig#getReplicationThrottleLimit()}). So before we potentially hit this limit, we want |
| * to schedule loading the more recent segments (among all of those that need to be loaded). |
| * |
| * In both {@link LoadQueuePeon}s and {@link RunRules}, we want to load more recent segments first |
| * because presumably they are queried more often and contain are more important data for users, so if the Druid |
| * cluster has availability problems and struggling to make all segments available immediately, at least we try to |
| * make more "important" (more recent) segments available as soon as possible. |
| */ |
| static final Comparator<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering |
| .from(Comparators.intervalsByEndThenStart()) |
| .onResultOf(DataSegment::getInterval) |
| .compound(Ordering.<DataSegment>natural()) |
| .reverse(); |
| |
| private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class); |
| |
| private final Object lock = new Object(); |
| private final DruidCoordinatorConfig config; |
| private final ZkPathsConfig zkPaths; |
| private final JacksonConfigManager configManager; |
| private final SegmentsMetadataManager segmentsMetadataManager; |
| private final ServerInventoryView serverInventoryView; |
| private final MetadataRuleManager metadataRuleManager; |
| private final CuratorFramework curator; |
| private final ServiceEmitter emitter; |
| private final IndexingServiceClient indexingServiceClient; |
| private final ScheduledExecutorService exec; |
| private final LoadQueueTaskMaster taskMaster; |
| private final Map<String, LoadQueuePeon> loadManagementPeons; |
| private final ServiceAnnouncer serviceAnnouncer; |
| private final DruidNode self; |
| private final Set<CoordinatorDuty> indexingServiceDuties; |
| private final BalancerStrategyFactory factory; |
| private final LookupCoordinatorManager lookupCoordinatorManager; |
| private final DruidLeaderSelector coordLeaderSelector; |
| |
| private final CompactSegments compactSegments; |
| |
| private volatile boolean started = false; |
| private volatile SegmentReplicantLookup segmentReplicantLookup = null; |
| |
| private int cachedBalancerThreadNumber; |
| private ListeningExecutorService balancerExec; |
| |
| @Inject |
| public DruidCoordinator( |
| DruidCoordinatorConfig config, |
| ZkPathsConfig zkPaths, |
| JacksonConfigManager configManager, |
| SegmentsMetadataManager segmentsMetadataManager, |
| ServerInventoryView serverInventoryView, |
| MetadataRuleManager metadataRuleManager, |
| CuratorFramework curator, |
| ServiceEmitter emitter, |
| ScheduledExecutorFactory scheduledExecutorFactory, |
| IndexingServiceClient indexingServiceClient, |
| LoadQueueTaskMaster taskMaster, |
| ServiceAnnouncer serviceAnnouncer, |
| @Self DruidNode self, |
| @CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties, |
| BalancerStrategyFactory factory, |
| LookupCoordinatorManager lookupCoordinatorManager, |
| @Coordinator DruidLeaderSelector coordLeaderSelector, |
| CompactSegments compactSegments |
| ) |
| { |
| this( |
| config, |
| zkPaths, |
| configManager, |
| segmentsMetadataManager, |
| serverInventoryView, |
| metadataRuleManager, |
| curator, |
| emitter, |
| scheduledExecutorFactory, |
| indexingServiceClient, |
| taskMaster, |
| serviceAnnouncer, |
| self, |
| new ConcurrentHashMap<>(), |
| indexingServiceDuties, |
| factory, |
| lookupCoordinatorManager, |
| coordLeaderSelector, |
| compactSegments |
| ); |
| } |
| |
| DruidCoordinator( |
| DruidCoordinatorConfig config, |
| ZkPathsConfig zkPaths, |
| JacksonConfigManager configManager, |
| SegmentsMetadataManager segmentsMetadataManager, |
| ServerInventoryView serverInventoryView, |
| MetadataRuleManager metadataRuleManager, |
| CuratorFramework curator, |
| ServiceEmitter emitter, |
| ScheduledExecutorFactory scheduledExecutorFactory, |
| IndexingServiceClient indexingServiceClient, |
| LoadQueueTaskMaster taskMaster, |
| ServiceAnnouncer serviceAnnouncer, |
| DruidNode self, |
| ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap, |
| Set<CoordinatorDuty> indexingServiceDuties, |
| BalancerStrategyFactory factory, |
| LookupCoordinatorManager lookupCoordinatorManager, |
| DruidLeaderSelector coordLeaderSelector, |
| CompactSegments compactSegments |
| ) |
| { |
| this.config = config; |
| this.zkPaths = zkPaths; |
| this.configManager = configManager; |
| |
| this.segmentsMetadataManager = segmentsMetadataManager; |
| this.serverInventoryView = serverInventoryView; |
| this.metadataRuleManager = metadataRuleManager; |
| this.curator = curator; |
| this.emitter = emitter; |
| this.indexingServiceClient = indexingServiceClient; |
| this.taskMaster = taskMaster; |
| this.serviceAnnouncer = serviceAnnouncer; |
| this.self = self; |
| this.indexingServiceDuties = indexingServiceDuties; |
| |
| this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); |
| |
| this.loadManagementPeons = loadQueuePeonMap; |
| this.factory = factory; |
| this.lookupCoordinatorManager = lookupCoordinatorManager; |
| this.coordLeaderSelector = coordLeaderSelector; |
| |
| this.compactSegments = compactSegments; |
| } |
| |
| public boolean isLeader() |
| { |
| return coordLeaderSelector.isLeader(); |
| } |
| |
| public Map<String, LoadQueuePeon> getLoadManagementPeons() |
| { |
| return loadManagementPeons; |
| } |
| |
| /** |
| * @return tier -> { dataSource -> underReplicationCount } map |
| */ |
| public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier() |
| { |
| final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); |
| return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments); |
| } |
| |
| /** |
| * segmentReplicantLookup use in this method could potentially be stale since it is only updated on coordinator runs. |
| * However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure |
| * that the stale data in segmentReplicantLookup would be under counting replication levels, |
| * rather than potentially falsely reporting that everything is available. |
| * |
| * @return tier -> { dataSource -> underReplicationCount } map |
| */ |
| public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments( |
| Iterable<DataSegment> dataSegments |
| ) |
| { |
| final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>(); |
| |
| if (segmentReplicantLookup == null) { |
| return underReplicationCountsPerDataSourcePerTier; |
| } |
| |
| final DateTime now = DateTimes.nowUtc(); |
| |
| for (final DataSegment segment : dataSegments) { |
| final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource()); |
| |
| for (final Rule rule : rules) { |
| if (!rule.appliesTo(segment, now)) { |
| // Rule did not match. Continue to the next Rule. |
| continue; |
| } |
| if (!rule.canLoadSegments()) { |
| // Rule matched but rule does not and cannot load segments. |
| // Hence, there is no need to update underReplicationCountsPerDataSourcePerTier map |
| break; |
| } |
| |
| rule.updateUnderReplicated(underReplicationCountsPerDataSourcePerTier, segmentReplicantLookup, segment); |
| |
| // Only the first matching rule applies. This is because the Coordinator cycle through all used segments |
| // and match each segment with the first rule that applies. Each segment may only match a single rule. |
| break; |
| } |
| } |
| |
| return underReplicationCountsPerDataSourcePerTier; |
| } |
| |
| public Object2IntMap<String> computeNumsUnavailableUsedSegmentsPerDataSource() |
| { |
| if (segmentReplicantLookup == null) { |
| return Object2IntMaps.emptyMap(); |
| } |
| |
| final Object2IntOpenHashMap<String> numsUnavailableUsedSegmentsPerDataSource = new Object2IntOpenHashMap<>(); |
| |
| final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments(); |
| |
| for (DataSegment segment : dataSegments) { |
| if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) { |
| numsUnavailableUsedSegmentsPerDataSource.addTo(segment.getDataSource(), 1); |
| } else { |
| numsUnavailableUsedSegmentsPerDataSource.addTo(segment.getDataSource(), 0); |
| } |
| } |
| |
| return numsUnavailableUsedSegmentsPerDataSource; |
| } |
| |
| public Map<String, Double> getLoadStatus() |
| { |
| final Map<String, Double> loadStatus = new HashMap<>(); |
| final Collection<ImmutableDruidDataSource> dataSources = |
| segmentsMetadataManager.getImmutableDataSourcesWithAllUsedSegments(); |
| |
| for (ImmutableDruidDataSource dataSource : dataSources) { |
| final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments()); |
| final int numPublishedSegments = segments.size(); |
| |
| // remove loaded segments |
| for (DruidServer druidServer : serverInventoryView.getInventory()) { |
| final DruidDataSource loadedView = druidServer.getDataSource(dataSource.getName()); |
| if (loadedView != null) { |
| // This does not use segments.removeAll(loadedView.getSegments()) for performance reasons. |
| // Please see https://github.com/apache/druid/pull/5632 and LoadStatusBenchmark for more info. |
| for (DataSegment serverSegment : loadedView.getSegments()) { |
| segments.remove(serverSegment); |
| } |
| } |
| } |
| final int numUnavailableSegments = segments.size(); |
| loadStatus.put( |
| dataSource.getName(), |
| 100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments) |
| ); |
| } |
| |
| return loadStatus; |
| } |
| |
| @Nullable |
| public Long getTotalSizeOfSegmentsAwaitingCompaction(String dataSource) |
| { |
| return compactSegments.getTotalSizeOfSegmentsAwaitingCompaction(dataSource); |
| } |
| |
| @Nullable |
| public AutoCompactionSnapshot getAutoCompactionSnapshotForDataSource(String dataSource) |
| { |
| return compactSegments.getAutoCompactionSnapshot(dataSource); |
| } |
| |
| public Map<String, AutoCompactionSnapshot> getAutoCompactionSnapshot() |
| { |
| return compactSegments.getAutoCompactionSnapshot(); |
| } |
| |
| public CoordinatorDynamicConfig getDynamicConfigs() |
| { |
| return CoordinatorDynamicConfig.current(configManager); |
| } |
| |
| public CoordinatorCompactionConfig getCompactionConfig() |
| { |
| return CoordinatorCompactionConfig.current(configManager); |
| } |
| |
| public void markSegmentAsUnused(DataSegment segment) |
| { |
| log.debug("Marking segment[%s] as unused", segment.getId()); |
| segmentsMetadataManager.markSegmentAsUnused(segment.getId().toString()); |
| } |
| |
| public String getCurrentLeader() |
| { |
| return coordLeaderSelector.getCurrentLeader(); |
| } |
| |
| public void moveSegment( |
| DruidCoordinatorRuntimeParams params, |
| ImmutableDruidServer fromServer, |
| ImmutableDruidServer toServer, |
| DataSegment segment, |
| final LoadPeonCallback callback |
| ) |
| { |
| if (segment == null) { |
| log.makeAlert(new IAE("Can not move null DataSegment"), "Exception moving null segment").emit(); |
| if (callback != null) { |
| callback.execute(); |
| } |
| throw new ISE("Cannot move null DataSegment"); |
| } |
| SegmentId segmentId = segment.getId(); |
| try { |
| if (fromServer.getMetadata().equals(toServer.getMetadata())) { |
| throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName()); |
| } |
| |
| ImmutableDruidDataSource dataSource = params.getDataSourcesSnapshot().getDataSource(segment.getDataSource()); |
| if (dataSource == null) { |
| throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId); |
| } |
| |
| // get segment information from SegmentsMetadataManager instead of getting it from fromServer's. |
| // This is useful when SegmentsMetadataManager and fromServer DataSegment's are different for same |
| // identifier (say loadSpec differs because of deep storage migration). |
| final DataSegment segmentToLoad = dataSource.getSegment(segment.getId()); |
| if (segmentToLoad == null) { |
| throw new IAE("No segment metadata found for segment Id [%s]", segment.getId()); |
| } |
| final LoadQueuePeon loadPeon = loadManagementPeons.get(toServer.getName()); |
| if (loadPeon == null) { |
| throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", toServer.getName()); |
| } |
| |
| final LoadQueuePeon dropPeon = loadManagementPeons.get(fromServer.getName()); |
| if (dropPeon == null) { |
| throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", fromServer.getName()); |
| } |
| |
| final ServerHolder toHolder = new ServerHolder(toServer, loadPeon); |
| if (toHolder.getAvailableSize() < segmentToLoad.getSize()) { |
| throw new IAE( |
| "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.", |
| toServer.getName(), |
| segmentToLoad, |
| segmentToLoad.getSize(), |
| toHolder.getAvailableSize() |
| ); |
| } |
| |
| final String toLoadQueueSegPath = |
| ZKPaths.makePath(zkPaths.getLoadQueuePath(), toServer.getName(), segmentId.toString()); |
| |
| final LoadPeonCallback loadPeonCallback = () -> { |
| dropPeon.unmarkSegmentToDrop(segmentToLoad); |
| if (callback != null) { |
| callback.execute(); |
| } |
| }; |
| |
| // mark segment to drop before it is actually loaded on server |
| // to be able to account this information in DruidBalancerStrategy immediately |
| dropPeon.markSegmentToDrop(segmentToLoad); |
| try { |
| loadPeon.loadSegment( |
| segmentToLoad, |
| () -> { |
| try { |
| if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) && |
| curator.checkExists().forPath(toLoadQueueSegPath) == null && |
| !dropPeon.getSegmentsToDrop().contains(segment)) { |
| dropPeon.dropSegment(segment, loadPeonCallback); |
| } else { |
| loadPeonCallback.execute(); |
| } |
| } |
| catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| ); |
| } |
| catch (Exception e) { |
| dropPeon.unmarkSegmentToDrop(segmentToLoad); |
| throw new RuntimeException(e); |
| } |
| } |
| catch (Exception e) { |
| log.makeAlert(e, "Exception moving segment %s", segmentId).emit(); |
| if (callback != null) { |
| callback.execute(); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| public int getCachedBalancerThreadNumber() |
| { |
| return cachedBalancerThreadNumber; |
| } |
| |
| @VisibleForTesting |
| public ListeningExecutorService getBalancerExec() |
| { |
| return balancerExec; |
| } |
| |
| @LifecycleStart |
| public void start() |
| { |
| synchronized (lock) { |
| if (started) { |
| return; |
| } |
| started = true; |
| |
| coordLeaderSelector.registerListener( |
| new DruidLeaderSelector.Listener() |
| { |
| @Override |
| public void becomeLeader() |
| { |
| DruidCoordinator.this.becomeLeader(); |
| } |
| |
| @Override |
| public void stopBeingLeader() |
| { |
| DruidCoordinator.this.stopBeingLeader(); |
| } |
| } |
| ); |
| } |
| } |
| |
| @LifecycleStop |
| public void stop() |
| { |
| synchronized (lock) { |
| if (!started) { |
| return; |
| } |
| |
| coordLeaderSelector.unregisterListener(); |
| |
| started = false; |
| |
| exec.shutdownNow(); |
| |
| if (balancerExec != null) { |
| balancerExec.shutdownNow(); |
| } |
| } |
| } |
| |
| public void runCompactSegmentsDuty() |
| { |
| final int startingLeaderCounter = coordLeaderSelector.localTerm(); |
| DutiesRunnable compactSegmentsDuty = new DutiesRunnable(makeCompactSegmentsDuty(), startingLeaderCounter); |
| compactSegmentsDuty.run(); |
| } |
| |
| private void becomeLeader() |
| { |
| synchronized (lock) { |
| if (!started) { |
| return; |
| } |
| |
| log.info( |
| "I am the leader of the coordinators, all must bow! Starting coordination in [%s].", |
| config.getCoordinatorStartDelay() |
| ); |
| |
| segmentsMetadataManager.startPollingDatabasePeriodically(); |
| metadataRuleManager.start(); |
| lookupCoordinatorManager.start(); |
| serviceAnnouncer.announce(self); |
| final int startingLeaderCounter = coordLeaderSelector.localTerm(); |
| |
| final List<Pair<? extends DutiesRunnable, Duration>> dutiesRunnables = new ArrayList<>(); |
| dutiesRunnables.add( |
| Pair.of( |
| new DutiesRunnable(makeHistoricalManagementDuties(), startingLeaderCounter), |
| config.getCoordinatorPeriod() |
| ) |
| ); |
| if (indexingServiceClient != null) { |
| dutiesRunnables.add( |
| Pair.of( |
| new DutiesRunnable(makeIndexingServiceDuties(), startingLeaderCounter), |
| config.getCoordinatorIndexingPeriod() |
| ) |
| ); |
| } |
| |
| for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) { |
| // CompactSegmentsDuty can takes a non trival amount of time to complete. |
| // Hence, we schedule at fixed rate to make sure the other tasks still run at approximately every |
| // config.getCoordinatorIndexingPeriod() period. Note that cautious should be taken |
| // if setting config.getCoordinatorIndexingPeriod() lower than the default value. |
| ScheduledExecutors.scheduleAtFixedRate( |
| exec, |
| config.getCoordinatorStartDelay(), |
| dutiesRunnable.rhs, |
| new Callable<ScheduledExecutors.Signal>() |
| { |
| private final DutiesRunnable theRunnable = dutiesRunnable.lhs; |
| |
| @Override |
| public ScheduledExecutors.Signal call() |
| { |
| if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { |
| theRunnable.run(); |
| } |
| if (coordLeaderSelector.isLeader() |
| && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader) |
| return ScheduledExecutors.Signal.REPEAT; |
| } else { |
| return ScheduledExecutors.Signal.STOP; |
| } |
| } |
| } |
| ); |
| } |
| } |
| } |
| |
| private void stopBeingLeader() |
| { |
| synchronized (lock) { |
| |
| log.info("I am no longer the leader..."); |
| |
| for (String server : loadManagementPeons.keySet()) { |
| LoadQueuePeon peon = loadManagementPeons.remove(server); |
| peon.stop(); |
| } |
| loadManagementPeons.clear(); |
| |
| serviceAnnouncer.unannounce(self); |
| lookupCoordinatorManager.stop(); |
| metadataRuleManager.stop(); |
| segmentsMetadataManager.stopPollingDatabasePeriodically(); |
| |
| if (balancerExec != null) { |
| balancerExec.shutdownNow(); |
| balancerExec = null; |
| } |
| } |
| } |
| |
| private List<CoordinatorDuty> makeHistoricalManagementDuties() |
| { |
| return ImmutableList.of( |
| new LogUsedSegments(), |
| new UpdateCoordinatorStateAndPrepareCluster(), |
| new RunRules(DruidCoordinator.this), |
| new UnloadUnusedSegments(), |
| new MarkAsUnusedOvershadowedSegments(DruidCoordinator.this), |
| new BalanceSegments(DruidCoordinator.this), |
| new EmitClusterStatsAndMetrics(DruidCoordinator.this) |
| ); |
| } |
| |
| private List<CoordinatorDuty> makeIndexingServiceDuties() |
| { |
| List<CoordinatorDuty> duties = new ArrayList<>(); |
| duties.add(new LogUsedSegments()); |
| duties.addAll(indexingServiceDuties); |
| // CompactSegmentsDuty should be the last duty as it can take a long time to complete |
| duties.addAll(makeCompactSegmentsDuty()); |
| |
| log.debug( |
| "Done making indexing service duties %s", |
| duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList()) |
| ); |
| return ImmutableList.copyOf(duties); |
| } |
| |
| private List<CoordinatorDuty> makeCompactSegmentsDuty() |
| { |
| return ImmutableList.of(compactSegments); |
| } |
| |
| @VisibleForTesting |
| protected class DutiesRunnable implements Runnable |
| { |
| private final long startTimeNanos = System.nanoTime(); |
| private final List<CoordinatorDuty> duties; |
| private final int startingLeaderCounter; |
| |
| protected DutiesRunnable(List<CoordinatorDuty> duties, final int startingLeaderCounter) |
| { |
| this.duties = duties; |
| this.startingLeaderCounter = startingLeaderCounter; |
| } |
| |
| @VisibleForTesting |
| protected void initBalancerExecutor() |
| { |
| final int currentNumber = getDynamicConfigs().getBalancerComputeThreads(); |
| final String threadNameFormat = "coordinator-cost-balancer-%s"; |
| // fist time initialization |
| if (balancerExec == null) { |
| balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( |
| currentNumber, |
| threadNameFormat |
| )); |
| cachedBalancerThreadNumber = currentNumber; |
| return; |
| } |
| |
| if (cachedBalancerThreadNumber != currentNumber) { |
| log.info( |
| "balancerComputeThreads has been changed from [%s] to [%s], recreating the thread pool.", |
| cachedBalancerThreadNumber, |
| currentNumber |
| ); |
| balancerExec.shutdownNow(); |
| balancerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded( |
| currentNumber, |
| threadNameFormat |
| )); |
| cachedBalancerThreadNumber = currentNumber; |
| } |
| } |
| |
| @Override |
| public void run() |
| { |
| try { |
| synchronized (lock) { |
| if (!coordLeaderSelector.isLeader()) { |
| log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader()); |
| stopBeingLeader(); |
| return; |
| } |
| } |
| |
| List<Boolean> allStarted = Arrays.asList( |
| segmentsMetadataManager.isPollingDatabasePeriodically(), |
| serverInventoryView.isStarted() |
| ); |
| for (Boolean aBoolean : allStarted) { |
| if (!aBoolean) { |
| log.error("InventoryManagers not started[%s]", allStarted); |
| stopBeingLeader(); |
| return; |
| } |
| } |
| |
| initBalancerExecutor(); |
| BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec); |
| |
| // Do coordinator stuff. |
| DataSourcesSnapshot dataSourcesSnapshot = segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments(); |
| |
| DruidCoordinatorRuntimeParams params = |
| DruidCoordinatorRuntimeParams |
| .newBuilder() |
| .withDatabaseRuleManager(metadataRuleManager) |
| .withStartTimeNanos(startTimeNanos) |
| .withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot) |
| .withDynamicConfigs(getDynamicConfigs()) |
| .withCompactionConfig(getCompactionConfig()) |
| .withEmitter(emitter) |
| .withBalancerStrategy(balancerStrategy) |
| .build(); |
| |
| boolean coordinationPaused = getDynamicConfigs().getPauseCoordination(); |
| if (coordinationPaused |
| && coordLeaderSelector.isLeader() |
| && startingLeaderCounter == coordLeaderSelector.localTerm()) { |
| |
| log.debug( |
| "Coordination is paused via dynamic configs! I will not be running Coordination Duties at this time" |
| ); |
| } |
| |
| for (CoordinatorDuty duty : duties) { |
| // Don't read state and run state in the same duty otherwise racy conditions may exist |
| if (!coordinationPaused |
| && coordLeaderSelector.isLeader() |
| && startingLeaderCounter == coordLeaderSelector.localTerm()) { |
| |
| params = duty.run(params); |
| |
| if (params == null) { |
| // This duty wanted to cancel the run. No log message, since the duty should have logged a reason. |
| return; |
| } |
| } |
| } |
| } |
| catch (Exception e) { |
| log.makeAlert(e, "Caught exception, ignoring so that schedule keeps going.").emit(); |
| } |
| } |
| } |
| |
| /** |
| * Updates the enclosing {@link DruidCoordinator}'s state and prepares an immutable view of the cluster state (which |
| * consists of {@link DruidCluster} and {@link SegmentReplicantLookup}) and feeds it into {@link |
| * DruidCoordinatorRuntimeParams} for use in subsequent {@link CoordinatorDuty}s (see the order in {@link |
| * #makeHistoricalManagementDuties()}). |
| */ |
| private class UpdateCoordinatorStateAndPrepareCluster implements CoordinatorDuty |
| { |
| @Nullable |
| @Override |
| public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) |
| { |
| List<ImmutableDruidServer> currentServers = prepareCurrentServers(); |
| |
| startPeonsForNewServers(currentServers); |
| |
| final DruidCluster cluster = prepareCluster(params, currentServers); |
| segmentReplicantLookup = SegmentReplicantLookup.make(cluster); |
| |
| stopPeonsForDisappearedServers(currentServers); |
| |
| return params.buildFromExisting() |
| .withDruidCluster(cluster) |
| .withLoadManagementPeons(loadManagementPeons) |
| .withSegmentReplicantLookup(segmentReplicantLookup) |
| .build(); |
| } |
| |
| List<ImmutableDruidServer> prepareCurrentServers() |
| { |
| List<ImmutableDruidServer> currentServers = serverInventoryView |
| .getInventory() |
| .stream() |
| .filter(DruidServer::isSegmentReplicationOrBroadcastTarget) |
| .map(DruidServer::toImmutableDruidServer) |
| .collect(Collectors.toList()); |
| |
| if (log.isDebugEnabled()) { |
| // Display info about all segment-replicatable (historical and bridge) servers |
| log.debug("Servers"); |
| for (ImmutableDruidServer druidServer : currentServers) { |
| log.debug(" %s", druidServer); |
| log.debug(" -- DataSources"); |
| for (ImmutableDruidDataSource druidDataSource : druidServer.getDataSources()) { |
| log.debug(" %s", druidDataSource); |
| } |
| } |
| } |
| return currentServers; |
| } |
| |
| void startPeonsForNewServers(List<ImmutableDruidServer> currentServers) |
| { |
| for (ImmutableDruidServer server : currentServers) { |
| loadManagementPeons.computeIfAbsent(server.getName(), serverName -> { |
| LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server); |
| loadQueuePeon.start(); |
| log.debug("Created LoadQueuePeon for server[%s].", server.getName()); |
| return loadQueuePeon; |
| }); |
| } |
| } |
| |
| DruidCluster prepareCluster(DruidCoordinatorRuntimeParams params, List<ImmutableDruidServer> currentServers) |
| { |
| Set<String> decommissioningServers = params.getCoordinatorDynamicConfig().getDecommissioningNodes(); |
| final DruidCluster cluster = new DruidCluster(); |
| for (ImmutableDruidServer server : currentServers) { |
| cluster.add( |
| new ServerHolder( |
| server, |
| loadManagementPeons.get(server.getName()), |
| decommissioningServers.contains(server.getHost()) |
| ) |
| ); |
| } |
| return cluster; |
| } |
| |
| void stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers) |
| { |
| final Set<String> disappeared = Sets.newHashSet(loadManagementPeons.keySet()); |
| for (ImmutableDruidServer server : servers) { |
| disappeared.remove(server.getName()); |
| } |
| for (String name : disappeared) { |
| log.debug("Removing listener for server[%s] which is no longer there.", name); |
| LoadQueuePeon peon = loadManagementPeons.remove(name); |
| peon.stop(); |
| } |
| } |
| } |
| } |
| |