| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.server.lookup.cache; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Function; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Collections2; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Sets; |
| import com.google.common.net.HostAndPort; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListenableScheduledFuture; |
| import com.google.common.util.concurrent.ListeningScheduledExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.inject.Inject; |
| import org.apache.druid.audit.AuditInfo; |
| import org.apache.druid.common.config.JacksonConfigManager; |
| import org.apache.druid.concurrent.LifecycleLock; |
| import org.apache.druid.discovery.DruidNodeDiscoveryProvider; |
| import org.apache.druid.guice.annotations.EscalatedGlobal; |
| import org.apache.druid.guice.annotations.Smile; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.IOE; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.StreamUtils; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.http.client.HttpClient; |
| import org.apache.druid.java.util.http.client.Request; |
| import org.apache.druid.java.util.http.client.response.ClientResponse; |
| import org.apache.druid.java.util.http.client.response.HttpResponseHandler; |
| import org.apache.druid.java.util.http.client.response.SequenceInputStreamResponseHandler; |
| import org.apache.druid.query.lookup.LookupsState; |
| import org.apache.druid.server.http.HostAndPortWithScheme; |
| import org.apache.druid.server.listener.resource.ListenerResource; |
| import org.jboss.netty.handler.codec.http.HttpHeaders; |
| import org.jboss.netty.handler.codec.http.HttpMethod; |
| import org.jboss.netty.handler.codec.http.HttpResponse; |
| |
| import javax.annotation.Nullable; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.HttpURLConnection; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * Managers {@link org.apache.druid.query.lookup.LookupExtractorFactoryContainer} specifications, distributing them |
| * to {@link org.apache.druid.query.lookup.LookupReferencesManager} around the cluster by monitoring the lookup |
| * announce path for servers and utilizing their {@link org.apache.druid.query.lookup.LookupListeningResource} API |
| * to load, drop, and update lookups around the cluster. |
| */ |
| public class LookupCoordinatorManager |
| { |
| //key used in druid-0.10.0 with config manager |
| public static final String OLD_LOOKUP_CONFIG_KEY = "lookups"; |
| |
| public static final String LOOKUP_CONFIG_KEY = "lookupsConfig"; |
| public static final String LOOKUP_LISTEN_ANNOUNCE_KEY = "lookups"; |
| |
| private static final String LOOKUP_BASE_REQUEST_PATH = ListenerResource.BASE_PATH |
| + "/" |
| + LOOKUP_LISTEN_ANNOUNCE_KEY; |
| private static final String LOOKUP_UPDATE_REQUEST_PATH = LOOKUP_BASE_REQUEST_PATH + "/" + "updates"; |
| |
| private static final TypeReference<LookupsState<LookupExtractorFactoryMapContainer>> LOOKUPS_STATE_TYPE_REFERENCE = |
| new TypeReference<LookupsState<LookupExtractorFactoryMapContainer>>() |
| { |
| }; |
| |
| private static final EmittingLogger LOG = new EmittingLogger(LookupCoordinatorManager.class); |
| |
| private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; |
| private LookupNodeDiscovery lookupNodeDiscovery; |
| |
| private final JacksonConfigManager configManager; |
| private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig; |
| private final LookupsCommunicator lookupsCommunicator; |
| |
| // Known lookup state across various cluster nodes is managed in the reference here. On each lookup management loop |
| // state is rediscovered and updated in the ref here. If some lookup nodes have disappeared since last lookup |
| // management loop, then they get discarded automatically. |
| @VisibleForTesting |
| final AtomicReference<Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> knownOldState = |
| new AtomicReference<>(ImmutableMap.of()); |
| |
| // Updated by config watching service |
| private AtomicReference<Map<String, Map<String, LookupExtractorFactoryMapContainer>>> lookupMapConfigRef; |
| |
| private final LifecycleLock lifecycleLock = new LifecycleLock(); |
| |
| private ListeningScheduledExecutorService executorService; |
| private ListenableScheduledFuture<?> backgroundManagerFuture; |
| private CountDownLatch backgroundManagerExitedLatch; |
| |
| @Inject |
| public LookupCoordinatorManager( |
| final @EscalatedGlobal HttpClient httpClient, |
| final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, |
| final @Smile ObjectMapper smileMapper, |
| final JacksonConfigManager configManager, |
| final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig |
| ) |
| { |
| this( |
| druidNodeDiscoveryProvider, |
| configManager, |
| lookupCoordinatorManagerConfig, |
| new LookupsCommunicator( |
| httpClient, |
| lookupCoordinatorManagerConfig, |
| smileMapper |
| ), |
| null |
| ); |
| } |
| |
| @VisibleForTesting |
| LookupCoordinatorManager( |
| final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, |
| final JacksonConfigManager configManager, |
| final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig, |
| final LookupsCommunicator lookupsCommunicator, |
| final LookupNodeDiscovery lookupNodeDiscovery |
| ) |
| { |
| this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider; |
| this.configManager = configManager; |
| this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig; |
| this.lookupsCommunicator = lookupsCommunicator; |
| this.lookupNodeDiscovery = lookupNodeDiscovery; |
| } |
| |
| public boolean updateLookup( |
| final String tier, |
| final String lookupName, |
| LookupExtractorFactoryMapContainer spec, |
| final AuditInfo auditInfo |
| ) |
| { |
| return updateLookups( |
| ImmutableMap.of(tier, ImmutableMap.of(lookupName, spec)), |
| auditInfo |
| ); |
| } |
| |
| public boolean updateLookups(final Map<String, Map<String, LookupExtractorFactoryMapContainer>> updateSpec, AuditInfo auditInfo) |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| |
| if (updateSpec.isEmpty() && lookupMapConfigRef.get() != null) { |
| return true; |
| } |
| |
| //ensure all the lookups specs have version specified. ideally this should be done in the LookupExtractorFactoryMapContainer |
| //constructor but that allows null to enable backward compatibility with 0.10.0 lookup specs |
| for (final Map.Entry<String, Map<String, LookupExtractorFactoryMapContainer>> tierEntry : updateSpec.entrySet()) { |
| for (Map.Entry<String, LookupExtractorFactoryMapContainer> e : tierEntry.getValue().entrySet()) { |
| Preconditions.checkNotNull( |
| e.getValue().getVersion(), |
| "lookup [%s]:[%s] does not have version.", tierEntry.getKey(), e.getKey() |
| ); |
| } |
| } |
| |
| synchronized (this) { |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> priorSpec = getKnownLookups(); |
| if (priorSpec == null && !updateSpec.isEmpty()) { |
| // To prevent accidentally erasing configs if we haven't updated our cache of the values |
| throw new ISE("Not initialized. If this is the first lookup, post an empty map to initialize"); |
| } |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> updatedSpec; |
| |
| // Only add or update here, don't delete. |
| if (priorSpec == null) { |
| // all new |
| updatedSpec = updateSpec; |
| } else { |
| // Needs update |
| updatedSpec = new HashMap<>(priorSpec); |
| for (final Map.Entry<String, Map<String, LookupExtractorFactoryMapContainer>> tierEntry : updateSpec.entrySet()) { |
| final String tier = tierEntry.getKey(); |
| final Map<String, LookupExtractorFactoryMapContainer> updateTierSpec = tierEntry.getValue(); |
| final Map<String, LookupExtractorFactoryMapContainer> priorTierSpec = priorSpec.get(tier); |
| |
| if (priorTierSpec == null) { |
| // New tier |
| updatedSpec.put(tier, updateTierSpec); |
| } else { |
| // Update existing tier |
| final Map<String, LookupExtractorFactoryMapContainer> updatedTierSpec = new HashMap<>(priorTierSpec); |
| |
| for (Map.Entry<String, LookupExtractorFactoryMapContainer> e : updateTierSpec.entrySet()) { |
| if (updatedTierSpec.containsKey(e.getKey()) && !e.getValue().replaces(updatedTierSpec.get(e.getKey()))) { |
| throw new IAE( |
| "given update for lookup [%s]:[%s] can't replace existing spec [%s].", |
| tier, |
| e.getKey(), |
| updatedTierSpec.get(e.getKey()) |
| ); |
| } |
| } |
| updatedTierSpec.putAll(updateTierSpec); |
| updatedSpec.put(tier, updatedTierSpec); |
| } |
| } |
| } |
| return configManager.set(LOOKUP_CONFIG_KEY, updatedSpec, auditInfo).isOk(); |
| } |
| } |
| |
| public Map<String, Map<String, LookupExtractorFactoryMapContainer>> getKnownLookups() |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| return lookupMapConfigRef.get(); |
| } |
| |
| public boolean deleteTier(final String tier, AuditInfo auditInfo) |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| |
| synchronized (this) { |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> priorSpec = getKnownLookups(); |
| if (priorSpec == null) { |
| LOG.warn("Requested delete tier [%s]. But no lookups exist!", tier); |
| return false; |
| } |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> updateSpec = new HashMap<>(priorSpec); |
| |
| if (updateSpec.remove(tier) == null) { |
| LOG.warn("Requested delete of tier [%s] that does not exist!", tier); |
| return false; |
| } |
| |
| return configManager.set(LOOKUP_CONFIG_KEY, updateSpec, auditInfo).isOk(); |
| } |
| } |
| |
| public boolean deleteLookup(final String tier, final String lookup, AuditInfo auditInfo) |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| |
| synchronized (this) { |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> priorSpec = getKnownLookups(); |
| if (priorSpec == null) { |
| LOG.warn("Requested delete lookup [%s]/[%s]. But no lookups exist!", tier, lookup); |
| return false; |
| } |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> updateSpec = new HashMap<>(priorSpec); |
| final Map<String, LookupExtractorFactoryMapContainer> priorTierSpec = updateSpec.get(tier); |
| if (priorTierSpec == null) { |
| LOG.warn("Requested delete of lookup [%s]/[%s] but tier does not exist!", tier, lookup); |
| return false; |
| } |
| |
| if (!priorTierSpec.containsKey(lookup)) { |
| LOG.warn("Requested delete of lookup [%s]/[%s] but lookup does not exist!", tier, lookup); |
| return false; |
| } |
| |
| final Map<String, LookupExtractorFactoryMapContainer> updateTierSpec = new HashMap<>(priorTierSpec); |
| updateTierSpec.remove(lookup); |
| |
| if (updateTierSpec.isEmpty()) { |
| updateSpec.remove(tier); |
| } else { |
| updateSpec.put(tier, updateTierSpec); |
| } |
| return configManager.set(LOOKUP_CONFIG_KEY, updateSpec, auditInfo).isOk(); |
| } |
| } |
| |
| public Set<String> discoverTiers() |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| return lookupNodeDiscovery.getAllTiers(); |
| } |
| |
| public Collection<HostAndPort> discoverNodesInTier(String tier) |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| return Collections2.transform( |
| lookupNodeDiscovery.getNodesInTier(tier), |
| new Function<HostAndPortWithScheme, HostAndPort>() |
| { |
| @Override |
| public HostAndPort apply(HostAndPortWithScheme input) |
| { |
| return input.getHostAndPort(); |
| } |
| } |
| ); |
| } |
| |
| public Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> getLastKnownLookupsStateOnNodes() |
| { |
| Preconditions.checkState(lifecycleLock.awaitStarted(5, TimeUnit.SECONDS), "not started"); |
| return knownOldState.get(); |
| } |
| |
| /** |
| * Try to find a lookupName spec for the specified lookupName. |
| * |
| * @param lookupName The lookupName to look for |
| * |
| * @return The lookupName spec if found or null if not found or if no lookups at all are found |
| */ |
| @Nullable |
| public LookupExtractorFactoryMapContainer getLookup(final String tier, final String lookupName) |
| { |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> prior = getKnownLookups(); |
| if (prior == null) { |
| LOG.warn("Requested tier [%s] lookupName [%s]. But no lookups exist!", tier, lookupName); |
| return null; |
| } |
| final Map<String, LookupExtractorFactoryMapContainer> tierLookups = prior.get(tier); |
| if (tierLookups == null) { |
| LOG.warn("Tier [%s] does not exist", tier); |
| return null; |
| } |
| return tierLookups.get(lookupName); |
| } |
| |
| public boolean isStarted() |
| { |
| return lifecycleLock.isStarted(); |
| } |
| |
| @VisibleForTesting |
| boolean awaitStarted(long waitTimeMs) |
| { |
| return lifecycleLock.awaitStarted(waitTimeMs, TimeUnit.MILLISECONDS); |
| } |
| |
| // start() and stop() are synchronized so that they never run in parallel in case of ZK acting funny or druid bug and |
| // coordinator becomes leader and drops leadership in quick succession. |
| public void start() |
| { |
| synchronized (lifecycleLock) { |
| if (!lifecycleLock.canStart()) { |
| throw new ISE("LookupCoordinatorManager can't start."); |
| } |
| |
| try { |
| LOG.debug("Starting."); |
| |
| if (lookupNodeDiscovery == null) { |
| lookupNodeDiscovery = new LookupNodeDiscovery(druidNodeDiscoveryProvider); |
| } |
| |
| //first ensure that previous executorService from last cycle of start/stop has finished completely. |
| //so that we don't have multiple live executorService instances lying around doing lookup management. |
| if (executorService != null && |
| !executorService.awaitTermination( |
| lookupCoordinatorManagerConfig.getHostTimeout().getMillis() * 10, |
| TimeUnit.MILLISECONDS |
| )) { |
| throw new ISE("LookupCoordinatorManager executor from last start() hasn't finished. Failed to Start."); |
| } |
| |
| executorService = MoreExecutors.listeningDecorator( |
| Executors.newScheduledThreadPool( |
| lookupCoordinatorManagerConfig.getThreadPoolSize(), |
| Execs.makeThreadFactory("LookupCoordinatorManager--%s") |
| ) |
| ); |
| |
| initializeLookupsConfigWatcher(); |
| |
| this.backgroundManagerExitedLatch = new CountDownLatch(1); |
| this.backgroundManagerFuture = executorService.scheduleWithFixedDelay( |
| this::lookupManagementLoop, |
| lookupCoordinatorManagerConfig.getInitialDelay(), |
| lookupCoordinatorManagerConfig.getPeriod(), |
| TimeUnit.MILLISECONDS |
| ); |
| Futures.addCallback( |
| backgroundManagerFuture, |
| new FutureCallback<Object>() |
| { |
| @Override |
| public void onSuccess(@Nullable Object result) |
| { |
| backgroundManagerExitedLatch.countDown(); |
| LOG.debug("Exited background lookup manager"); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) |
| { |
| backgroundManagerExitedLatch.countDown(); |
| if (backgroundManagerFuture.isCancelled()) { |
| LOG.debug("Exited background lookup manager due to cancellation."); |
| } else { |
| LOG.makeAlert(t, "Background lookup manager exited with error!").emit(); |
| } |
| } |
| } |
| ); |
| |
| LOG.debug("Started"); |
| } |
| catch (Exception ex) { |
| LOG.makeAlert(ex, "Got Exception while start()").emit(); |
| } |
| finally { |
| //so that subsequent stop() would happen, even if start() failed with exception |
| lifecycleLock.started(); |
| lifecycleLock.exitStart(); |
| } |
| } |
| } |
| |
| public void stop() |
| { |
| synchronized (lifecycleLock) { |
| if (!lifecycleLock.canStop()) { |
| throw new ISE("LookupCoordinatorManager can't stop."); |
| } |
| |
| try { |
| LOG.debug("Stopping"); |
| |
| if (backgroundManagerFuture != null && !backgroundManagerFuture.cancel(true)) { |
| LOG.warn("Background lookup manager thread could not be cancelled"); |
| } |
| |
| // signal the executorService to shut down ASAP, if this coordinator becomes leader again |
| // then start() would ensure that this executorService is finished before starting a |
| // new one. |
| if (executorService != null) { |
| executorService.shutdownNow(); |
| } |
| |
| LOG.debug("Stopped"); |
| } |
| catch (Exception ex) { |
| LOG.makeAlert(ex, "Got Exception while stop()").emit(); |
| } |
| finally { |
| //so that subsequent start() would happen, even if stop() failed with exception |
| lifecycleLock.exitStopAndReset(); |
| } |
| } |
| } |
| |
| private void initializeLookupsConfigWatcher() |
| { |
| //Note: this call is idempotent, so multiple start() would not cause any problems. |
| lookupMapConfigRef = configManager.watch( |
| LOOKUP_CONFIG_KEY, |
| new TypeReference<Map<String, Map<String, LookupExtractorFactoryMapContainer>>>() |
| { |
| }, |
| null |
| ); |
| |
| // backward compatibility with 0.10.0 |
| if (lookupMapConfigRef.get() == null) { |
| Map<String, Map<String, Map<String, Object>>> oldLookups = configManager.watch( |
| OLD_LOOKUP_CONFIG_KEY, |
| new TypeReference<Map<String, Map<String, Map<String, Object>>>>() |
| { |
| }, |
| null |
| ).get(); |
| |
| if (oldLookups != null) { |
| Map<String, Map<String, LookupExtractorFactoryMapContainer>> converted = new HashMap<>(); |
| oldLookups.forEach( |
| (tier, oldTierLookups) -> { |
| if (oldTierLookups != null && !oldTierLookups.isEmpty()) { |
| converted.put(tier, convertTierLookups(oldTierLookups)); |
| } |
| } |
| ); |
| |
| configManager.set( |
| LOOKUP_CONFIG_KEY, |
| converted, |
| new AuditInfo("autoConversion", "autoConversion", "127.0.0.1") |
| ); |
| } |
| } |
| } |
| |
| private Map<String, LookupExtractorFactoryMapContainer> convertTierLookups( |
| Map<String, Map<String, Object>> oldTierLookups |
| ) |
| { |
| Map<String, LookupExtractorFactoryMapContainer> convertedTierLookups = new HashMap<>(); |
| oldTierLookups.forEach( |
| (lookup, lookupExtractorFactory) -> { |
| convertedTierLookups.put(lookup, new LookupExtractorFactoryMapContainer(null, lookupExtractorFactory)); |
| } |
| ); |
| return convertedTierLookups; |
| } |
| |
| @VisibleForTesting |
| void lookupManagementLoop() |
| { |
| // Sanity check for if we are shutting down |
| if (Thread.currentThread().isInterrupted() || !lifecycleLock.awaitStarted(15, TimeUnit.SECONDS)) { |
| LOG.info("Not updating lookups because process was interrupted or not finished starting yet."); |
| return; |
| } |
| |
| final Map<String, Map<String, LookupExtractorFactoryMapContainer>> allLookupTiers = lookupMapConfigRef.get(); |
| |
| if (allLookupTiers == null) { |
| LOG.info("Not updating lookups because no data exists"); |
| return; |
| } |
| |
| LOG.debug("Starting lookup sync for on all nodes."); |
| |
| try { |
| List<ListenableFuture<Map.Entry>> futures = new ArrayList<>(); |
| |
| Set<String> discoveredLookupTiers = lookupNodeDiscovery.getAllTiers(); |
| |
| // Check and Log warnings about lookups configured by user in DB but no nodes discovered to load those. |
| for (String tierInDB : allLookupTiers.keySet()) { |
| if (!discoveredLookupTiers.contains(tierInDB) && |
| !allLookupTiers.getOrDefault(tierInDB, ImmutableMap.of()).isEmpty()) { |
| LOG.warn("Found lookups for tier [%s] in DB, but no nodes discovered for it", tierInDB); |
| } |
| } |
| |
| for (String tier : discoveredLookupTiers) { |
| |
| LOG.debug("Starting lookup mgmt for tier [%s].", tier); |
| |
| final Map<String, LookupExtractorFactoryMapContainer> tierLookups = allLookupTiers.getOrDefault(tier, ImmutableMap.of()); |
| for (final HostAndPortWithScheme node : lookupNodeDiscovery.getNodesInTier(tier)) { |
| |
| LOG.debug( |
| "Starting lookup mgmt for tier [%s] and host [%s:%s:%s].", |
| tier, |
| node.getScheme(), |
| node.getHostText(), |
| node.getPort() |
| ); |
| |
| futures.add( |
| executorService.submit( |
| () -> { |
| try { |
| return new AbstractMap.SimpleImmutableEntry<>(node.getHostAndPort(), doLookupManagementOnNode(node, tierLookups)); |
| } |
| catch (InterruptedException ex) { |
| LOG.warn(ex, "lookup management on node [%s:%s:%s] interrupted.", node.getScheme(), node.getHostText(), node.getPort()); |
| return null; |
| } |
| catch (Exception ex) { |
| LOG.makeAlert( |
| ex, |
| "Failed to finish lookup management on node [%s:%s:%s]", |
| node.getScheme(), |
| node.getHostText(), |
| node.getPort() |
| ).emit(); |
| return null; |
| } |
| } |
| ) |
| ); |
| } |
| } |
| |
| final ListenableFuture<List<Map.Entry>> allFuture = Futures.allAsList(futures); |
| try { |
| ImmutableMap.Builder<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> stateBuilder = ImmutableMap.builder(); |
| allFuture.get(lookupCoordinatorManagerConfig.getAllHostTimeout().getMillis(), TimeUnit.MILLISECONDS) |
| .stream() |
| .filter(Objects::nonNull) |
| .forEach(stateBuilder::put); |
| knownOldState.set(stateBuilder.build()); |
| } |
| catch (InterruptedException ex) { |
| allFuture.cancel(true); |
| Thread.currentThread().interrupt(); |
| throw ex; |
| } |
| catch (Exception ex) { |
| allFuture.cancel(true); |
| throw ex; |
| } |
| |
| } |
| catch (Exception ex) { |
| LOG.makeAlert(ex, "Failed to finish lookup management loop.").emit(); |
| } |
| |
| LOG.debug("Finished lookup sync for on all nodes."); |
| } |
| |
| private LookupsState<LookupExtractorFactoryMapContainer> doLookupManagementOnNode( |
| HostAndPortWithScheme node, |
| Map<String, LookupExtractorFactoryMapContainer> nodeTierLookupsToBe |
| ) throws IOException, InterruptedException, ExecutionException |
| { |
| LOG.debug("Starting lookup sync for node [%s].", node); |
| |
| LookupsState<LookupExtractorFactoryMapContainer> currLookupsStateOnNode = lookupsCommunicator.getLookupStateForNode( |
| node |
| ); |
| LOG.debug("Received lookups state from node [%s].", node); |
| |
| |
| // Compare currLookupsStateOnNode with nodeTierLookupsToBe to find what are the lookups |
| // we need to further ask node to load/drop |
| Map<String, LookupExtractorFactoryMapContainer> toLoad = getToBeLoadedOnNode( |
| currLookupsStateOnNode, |
| nodeTierLookupsToBe |
| ); |
| Set<String> toDrop = getToBeDroppedFromNode(currLookupsStateOnNode, nodeTierLookupsToBe); |
| |
| if (!toLoad.isEmpty() || !toDrop.isEmpty()) { |
| // Send POST request to the node asking to load and drop the lookups necessary. |
| // no need to send "current" in the LookupsStateWithMap , that is not required |
| currLookupsStateOnNode = lookupsCommunicator.updateNode(node, new LookupsState<>(null, toLoad, toDrop)); |
| |
| LOG.debug( |
| "Sent lookup toAdd[%s] and toDrop[%s] updates to node [%s].", |
| toLoad.keySet(), |
| toDrop, |
| node |
| ); |
| } |
| |
| LOG.debug("Finished lookup sync for node [%s].", node); |
| return currLookupsStateOnNode; |
| } |
| |
| // Returns the Map<lookup-name, lookup-spec> that needs to be loaded by the node and it does not know about |
| // those already. |
| // It is assumed that currLookupsStateOnNode "toLoad" and "toDrop" are disjoint. |
| @VisibleForTesting |
| Map<String, LookupExtractorFactoryMapContainer> getToBeLoadedOnNode( |
| LookupsState<LookupExtractorFactoryMapContainer> currLookupsStateOnNode, |
| Map<String, LookupExtractorFactoryMapContainer> nodeTierLookupsToBe |
| ) |
| { |
| Map<String, LookupExtractorFactoryMapContainer> toLoad = new HashMap<>(); |
| for (Map.Entry<String, LookupExtractorFactoryMapContainer> e : nodeTierLookupsToBe.entrySet()) { |
| String name = e.getKey(); |
| LookupExtractorFactoryMapContainer lookupToBe = e.getValue(); |
| |
| // get it from the current pending notices list on the node |
| LookupExtractorFactoryMapContainer current = currLookupsStateOnNode.getToLoad().get(name); |
| |
| if (current == null) { |
| //ok, not on pending list, get from currently loaded lookups on node |
| current = currLookupsStateOnNode.getCurrent().get(name); |
| } |
| |
| if (current == null || //lookup is neither pending nor already loaded on the node OR |
| currLookupsStateOnNode.getToDrop().contains(name) || //it is being dropped on the node OR |
| lookupToBe.replaces(current) //lookup is already know to node, but lookupToBe overrides that |
| ) { |
| toLoad.put(name, lookupToBe); |
| } |
| } |
| return toLoad; |
| } |
| |
| // Returns Set<lookup-name> that should be dropped from the node which has them already either in pending to load |
| // state or loaded |
| // It is assumed that currLookupsStateOnNode "toLoad" and "toDrop" are disjoint. |
| @VisibleForTesting |
| Set<String> getToBeDroppedFromNode( |
| LookupsState<LookupExtractorFactoryMapContainer> currLookupsStateOnNode, |
| Map<String, LookupExtractorFactoryMapContainer> nodeTierLookupsToBe |
| ) |
| { |
| Set<String> toDrop = new HashSet<>(); |
| |
| // {currently loading/loaded on the node} - {currently pending deletion on node} - {lookups node should actually have} |
| toDrop.addAll(currLookupsStateOnNode.getCurrent().keySet()); |
| toDrop.addAll(currLookupsStateOnNode.getToLoad().keySet()); |
| toDrop = Sets.difference(toDrop, currLookupsStateOnNode.getToDrop()); |
| toDrop = Sets.difference(toDrop, nodeTierLookupsToBe.keySet()); |
| return toDrop; |
| } |
| |
| static URL getLookupsURL(HostAndPortWithScheme druidNode) throws MalformedURLException |
| { |
| return new URL( |
| druidNode.getScheme(), |
| druidNode.getHostText(), |
| druidNode.getPortOrDefault(-1), |
| LOOKUP_BASE_REQUEST_PATH |
| ); |
| } |
| |
| static URL getLookupsUpdateURL(HostAndPortWithScheme druidNode) throws MalformedURLException |
| { |
| return new URL( |
| druidNode.getScheme(), |
| druidNode.getHostText(), |
| druidNode.getPortOrDefault(-1), |
| LOOKUP_UPDATE_REQUEST_PATH |
| ); |
| } |
| |
| private static boolean httpStatusIsSuccess(int statusCode) |
| { |
| return statusCode >= 200 && statusCode < 300; |
| } |
| |
| @VisibleForTesting |
| boolean backgroundManagerIsRunning() |
| { |
| ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture; |
| return backgroundManagerFuture != null && !backgroundManagerFuture.isDone(); |
| } |
| |
| @VisibleForTesting |
| boolean waitForBackgroundTermination(long timeout) throws InterruptedException |
| { |
| return backgroundManagerExitedLatch.await(timeout, TimeUnit.MILLISECONDS); |
| } |
| |
| @VisibleForTesting |
| public static class LookupsCommunicator |
| { |
| private final HttpClient httpClient; |
| private final LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig; |
| private final ObjectMapper smileMapper; |
| |
| public LookupsCommunicator( |
| HttpClient httpClient, |
| LookupCoordinatorManagerConfig lookupCoordinatorManagerConfig, |
| ObjectMapper smileMapper |
| ) |
| { |
| this.httpClient = httpClient; |
| this.lookupCoordinatorManagerConfig = lookupCoordinatorManagerConfig; |
| this.smileMapper = smileMapper; |
| } |
| |
| public LookupsState<LookupExtractorFactoryMapContainer> updateNode( |
| HostAndPortWithScheme node, |
| LookupsState<LookupExtractorFactoryMapContainer> lookupsUpdate |
| ) |
| throws IOException, InterruptedException, ExecutionException |
| { |
| final AtomicInteger returnCode = new AtomicInteger(0); |
| final AtomicReference<String> reasonString = new AtomicReference<>(null); |
| |
| final URL url = getLookupsUpdateURL(node); |
| |
| LOG.debug("Sending lookups load/drop request to [%s]. Request [%s]", url, lookupsUpdate); |
| |
| try (final InputStream result = httpClient.go( |
| new Request(HttpMethod.POST, url) |
| .addHeader(HttpHeaders.Names.ACCEPT, SmileMediaTypes.APPLICATION_JACKSON_SMILE) |
| .addHeader(HttpHeaders.Names.CONTENT_TYPE, SmileMediaTypes.APPLICATION_JACKSON_SMILE) |
| .setContent(smileMapper.writeValueAsBytes(lookupsUpdate)), |
| makeResponseHandler(returnCode, reasonString), |
| lookupCoordinatorManagerConfig.getHostTimeout() |
| ).get()) { |
| if (httpStatusIsSuccess(returnCode.get())) { |
| try { |
| final LookupsState<LookupExtractorFactoryMapContainer> response = smileMapper.readValue( |
| result, |
| LOOKUPS_STATE_TYPE_REFERENCE |
| ); |
| LOG.debug( |
| "Update on [%s], Status: %s reason: [%s], Response [%s].", url, returnCode.get(), reasonString.get(), |
| response |
| ); |
| return response; |
| } |
| catch (IOException ex) { |
| throw new IOE(ex, "Failed to parse update response from [%s]. response [%s]", url, result); |
| } |
| } else { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| StreamUtils.copyAndClose(result, baos); |
| } |
| catch (IOException e2) { |
| LOG.warn(e2, "Error reading response"); |
| } |
| |
| throw new IOE( |
| "Bad update request to [%s] : [%d] : [%s] Response: [%s]", |
| url, |
| returnCode.get(), |
| reasonString.get(), |
| StringUtils.fromUtf8(baos.toByteArray()) |
| ); |
| } |
| } |
| } |
| |
| public LookupsState<LookupExtractorFactoryMapContainer> getLookupStateForNode( |
| HostAndPortWithScheme node |
| ) throws IOException, InterruptedException, ExecutionException |
| { |
| final URL url = getLookupsURL(node); |
| final AtomicInteger returnCode = new AtomicInteger(0); |
| final AtomicReference<String> reasonString = new AtomicReference<>(null); |
| |
| LOG.debug("Getting lookups from [%s]", url); |
| |
| try (final InputStream result = httpClient.go( |
| new Request(HttpMethod.GET, url) |
| .addHeader(HttpHeaders.Names.ACCEPT, SmileMediaTypes.APPLICATION_JACKSON_SMILE), |
| makeResponseHandler(returnCode, reasonString), |
| lookupCoordinatorManagerConfig.getHostTimeout() |
| ).get()) { |
| if (returnCode.get() == HttpURLConnection.HTTP_OK) { |
| try { |
| final LookupsState<LookupExtractorFactoryMapContainer> response = smileMapper.readValue( |
| result, |
| LOOKUPS_STATE_TYPE_REFERENCE |
| ); |
| LOG.debug( |
| "Get on [%s], Status: [%s] reason: [%s], Response [%s].", |
| url, |
| returnCode.get(), |
| reasonString.get(), |
| response |
| ); |
| return response; |
| } |
| catch (IOException ex) { |
| throw new IOE(ex, "Failed to parser GET lookups response from [%s]. response [%s].", url, result); |
| } |
| } else { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| try { |
| StreamUtils.copyAndClose(result, baos); |
| } |
| catch (IOException ex) { |
| LOG.warn(ex, "Error reading response from GET on url [%s]", url); |
| } |
| |
| throw new IOE( |
| "GET request failed to [%s] : [%d] : [%s] Response: [%s]", |
| url, |
| returnCode.get(), |
| reasonString.get(), |
| StringUtils.fromUtf8(baos.toByteArray()) |
| ); |
| } |
| } |
| } |
| |
| @VisibleForTesting |
| HttpResponseHandler<InputStream, InputStream> makeResponseHandler( |
| final AtomicInteger returnCode, |
| final AtomicReference<String> reasonString |
| ) |
| { |
| return new SequenceInputStreamResponseHandler() |
| { |
| @Override |
| public ClientResponse<InputStream> handleResponse(HttpResponse response, TrafficCop trafficCop) |
| { |
| returnCode.set(response.getStatus().getCode()); |
| reasonString.set(response.getStatus().getReasonPhrase()); |
| return super.handleResponse(response, trafficCop); |
| } |
| }; |
| } |
| } |
| } |