blob: 358ac105dc19ca177f2312085e528004810574a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.security.basic;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.server.DruidNode;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.Duration;
import javax.ws.rs.core.MediaType;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class CommonCacheNotifier
{
private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);
/**
* {@link NodeRole#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* from metadata storage.
*/
private static final List<NodeRole> NODE_TYPES = Arrays.asList(
NodeRole.BROKER,
NodeRole.OVERLORD,
NodeRole.HISTORICAL,
NodeRole.PEON,
NodeRole.ROUTER,
NodeRole.MIDDLE_MANAGER,
NodeRole.INDEXER
);
private final DruidNodeDiscoveryProvider discoveryProvider;
private final HttpClient httpClient;
private final BlockingQueue<Pair<String, byte[]>> updateQueue;
private final Map<String, BasicAuthDBConfig> itemConfigMap;
private final String baseUrl;
private final String callerName;
private final ExecutorService exec;
public CommonCacheNotifier(
Map<String, BasicAuthDBConfig> itemConfigMap,
DruidNodeDiscoveryProvider discoveryProvider,
HttpClient httpClient,
String baseUrl,
String callerName
)
{
this.exec = Execs.singleThreaded(
StringUtils.format("%s-notifierThread-", StringUtils.encodeForFormat(callerName)) + "%d"
);
this.callerName = callerName;
this.updateQueue = new LinkedBlockingQueue<>();
this.itemConfigMap = itemConfigMap;
this.discoveryProvider = discoveryProvider;
this.httpClient = httpClient;
this.baseUrl = baseUrl;
}
public void start()
{
exec.submit(
() -> {
while (!Thread.interrupted()) {
try {
LOG.debug(callerName + ":Waiting for cache update notification");
Pair<String, byte[]> update = updateQueue.take();
String authorizer = update.lhs;
byte[] serializedMap = update.rhs;
BasicAuthDBConfig authorizerConfig = itemConfigMap.get(update.lhs);
if (!authorizerConfig.isEnableCacheNotifications()) {
continue;
}
LOG.debug(callerName + ":Sending cache update notifications");
// Best effort, if a notification fails, the remote node will eventually poll to update its state
// We wait for responses however, to avoid flooding remote nodes with notifications.
List<ListenableFuture<StatusResponseHolder>> futures = sendUpdate(
authorizer,
serializedMap
);
try {
List<StatusResponseHolder> responses = Futures.allAsList(futures)
.get(
authorizerConfig.getCacheNotificationTimeout(),
TimeUnit.MILLISECONDS
);
for (StatusResponseHolder response : responses) {
LOG.debug(callerName + ":Got status: " + response.getStatus());
}
}
catch (Exception e) {
LOG.makeAlert(e, callerName + ":Failed to get response for cache notification.").emit();
}
LOG.debug(callerName + ":Received responses for cache update notifications.");
}
catch (InterruptedException e) {
LOG.noStackTrace().info(e, "%s: Interrupted while handling updates for cachedUserMaps.", callerName);
}
catch (Throwable t) {
LOG.makeAlert(t, callerName + ":Error occured while handling updates for cachedUserMaps.").emit();
}
}
}
);
}
public void stop()
{
exec.shutdownNow();
}
public void addUpdate(String updatedItemName, byte[] updatedItemData)
{
updateQueue.add(
new Pair<>(updatedItemName, updatedItemData)
);
}
private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String updatedAuthenticatorPrefix, byte[] serializedEntity)
{
List<ListenableFuture<StatusResponseHolder>> futures = new ArrayList<>();
for (NodeRole nodeRole : NODE_TYPES) {
DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeRole(nodeRole);
Collection<DiscoveryDruidNode> nodes = nodeDiscovery.getAllNodes();
for (DiscoveryDruidNode node : nodes) {
URL listenerURL = getListenerURL(
node.getDruidNode(),
StringUtils.format(baseUrl, StringUtils.urlEncode(updatedAuthenticatorPrefix))
);
// best effort, if this fails, remote node will poll and pick up the update eventually
Request req = new Request(HttpMethod.POST, listenerURL);
req.setContent(MediaType.APPLICATION_JSON, serializedEntity);
BasicAuthDBConfig itemConfig = itemConfigMap.get(updatedAuthenticatorPrefix);
ListenableFuture<StatusResponseHolder> future = httpClient.go(
req,
new ResponseHandler(),
Duration.millis(itemConfig.getCacheNotificationTimeout())
);
futures.add(future);
}
}
return futures;
}
private URL getListenerURL(DruidNode druidNode, String baseUrl)
{
try {
return new URL(
druidNode.getServiceScheme(),
druidNode.getHost(),
druidNode.getPortToUse(),
baseUrl
);
}
catch (MalformedURLException mue) {
LOG.error(callerName + ": Malformed url for DruidNode[%s] and baseUrl[%s]", druidNode, baseUrl);
throw new RuntimeException(mue);
}
}
// Based off StatusResponseHandler, but with response content ignored
private static class ResponseHandler implements HttpResponseHandler<StatusResponseHolder, StatusResponseHolder>
{
protected static final Logger log = new Logger(ResponseHandler.class);
@Override
public ClientResponse<StatusResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
return ClientResponse.unfinished(
new StatusResponseHolder(
response.getStatus(),
null
)
);
}
@Override
public ClientResponse<StatusResponseHolder> handleChunk(
ClientResponse<StatusResponseHolder> response,
HttpChunk chunk,
long chunkNum
)
{
return response;
}
@Override
public ClientResponse<StatusResponseHolder> done(ClientResponse<StatusResponseHolder> response)
{
return ClientResponse.finished(response.getObj());
}
@Override
public void exceptionCaught(ClientResponse<StatusResponseHolder> clientResponse, Throwable e)
{
// Its safe to Ignore as the ClientResponse returned in handleChunk were unfinished
log.error(e, "exceptionCaught in CommonCacheNotifier ResponseHandler.");
}
}
}