blob: 5ccd04b7ef127dc771deb5cc2823c4c0f298975b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.security.basic.authorization.db.cache;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
import org.apache.druid.security.basic.BasicAuthCommonCacheConfig;
import org.apache.druid.security.basic.BasicAuthUtils;
import org.apache.druid.security.basic.authorization.BasicRoleBasedAuthorizer;
import org.apache.druid.security.basic.authorization.entity.BasicAuthorizerGroupMapping;
import org.apache.druid.security.basic.authorization.entity.BasicAuthorizerRole;
import org.apache.druid.security.basic.authorization.entity.BasicAuthorizerUser;
import org.apache.druid.security.basic.authorization.entity.GroupMappingAndRoleMap;
import org.apache.druid.security.basic.authorization.entity.UserAndRoleMap;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Duration;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ManageLifecycle
public class CoordinatorPollingBasicAuthorizerCacheManager implements BasicAuthorizerCacheManager
{
private static final EmittingLogger LOG = new EmittingLogger(CoordinatorPollingBasicAuthorizerCacheManager.class);
private final ConcurrentHashMap<String, Map<String, BasicAuthorizerUser>> cachedUserMaps;
private final ConcurrentHashMap<String, Map<String, BasicAuthorizerRole>> cachedRoleMaps;
private final ConcurrentHashMap<String, Map<String, BasicAuthorizerGroupMapping>> cachedGroupMappingMaps;
private final ConcurrentHashMap<String, Map<String, BasicAuthorizerRole>> cachedGroupMappingRoleMaps;
private final Set<String> authorizerPrefixes;
private final Injector injector;
private final ObjectMapper objectMapper;
private final LifecycleLock lifecycleLock = new LifecycleLock();
private final DruidLeaderClient druidLeaderClient;
private final BasicAuthCommonCacheConfig commonCacheConfig;
private final ScheduledExecutorService exec;
@Inject
public CoordinatorPollingBasicAuthorizerCacheManager(
Injector injector,
BasicAuthCommonCacheConfig commonCacheConfig,
@Smile ObjectMapper objectMapper,
@Coordinator DruidLeaderClient druidLeaderClient
)
{
this.exec = Execs.scheduledSingleThreaded("CoordinatorPollingBasicAuthorizerCacheManager-Exec--%d");
this.injector = injector;
this.commonCacheConfig = commonCacheConfig;
this.objectMapper = objectMapper;
this.cachedUserMaps = new ConcurrentHashMap<>();
this.cachedRoleMaps = new ConcurrentHashMap<>();
this.cachedGroupMappingMaps = new ConcurrentHashMap<>();
this.cachedGroupMappingRoleMaps = new ConcurrentHashMap<>();
this.authorizerPrefixes = new HashSet<>();
this.druidLeaderClient = druidLeaderClient;
}
@LifecycleStart
public void start()
{
if (!lifecycleLock.canStart()) {
throw new ISE("can't start.");
}
LOG.info("Starting CoordinatorPollingBasicAuthorizerCacheManager.");
try {
initUserMaps();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(commonCacheConfig.getPollingPeriod()),
new Duration(commonCacheConfig.getPollingPeriod()),
() -> {
try {
long randomDelay = ThreadLocalRandom.current().nextLong(0, commonCacheConfig.getMaxRandomDelay());
LOG.debug("Inserting random polling delay of [%s] ms", randomDelay);
Thread.sleep(randomDelay);
LOG.debug("Scheduled userMap cache poll is running");
for (String authorizerPrefix : authorizerPrefixes) {
UserAndRoleMap userAndRoleMap = fetchUserAndRoleMapFromCoordinator(authorizerPrefix, false);
if (userAndRoleMap != null) {
cachedUserMaps.put(authorizerPrefix, userAndRoleMap.getUserMap());
cachedRoleMaps.put(authorizerPrefix, userAndRoleMap.getRoleMap());
}
}
LOG.debug("Scheduled userMap cache poll is done");
}
catch (Throwable t) {
LOG.makeAlert(t, "Error occured while polling for cachedUserMaps.").emit();
}
}
);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(commonCacheConfig.getPollingPeriod()),
new Duration(commonCacheConfig.getPollingPeriod()),
() -> {
try {
long randomDelay = ThreadLocalRandom.current().nextLong(0, commonCacheConfig.getMaxRandomDelay());
LOG.debug("Inserting random polling delay of [%s] ms", randomDelay);
Thread.sleep(randomDelay);
LOG.debug("Scheduled groupMappingMap cache poll is running");
for (String authorizerPrefix : authorizerPrefixes) {
GroupMappingAndRoleMap groupMappingAndRoleMap = fetchGroupAndRoleMapFromCoordinator(authorizerPrefix, false);
if (groupMappingAndRoleMap != null) {
cachedGroupMappingMaps.put(authorizerPrefix, groupMappingAndRoleMap.getGroupMappingMap());
cachedGroupMappingRoleMaps.put(authorizerPrefix, groupMappingAndRoleMap.getRoleMap());
}
}
LOG.debug("Scheduled groupMappingMap cache poll is done");
}
catch (Throwable t) {
LOG.makeAlert(t, "Error occured while polling for cachedGroupMappingMaps.").emit();
}
}
);
lifecycleLock.started();
LOG.info("Started CoordinatorPollingBasicAuthorizerCacheManager.");
}
finally {
lifecycleLock.exitStart();
}
}
@LifecycleStop
public void stop()
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
}
LOG.info("CoordinatorPollingBasicAuthorizerCacheManager is stopping.");
exec.shutdown();
LOG.info("CoordinatorPollingBasicAuthorizerCacheManager is stopped.");
}
@Override
public void handleAuthorizerUserUpdate(String authorizerPrefix, byte[] serializedUserAndRoleMap)
{
LOG.debug("Received userMap cache update for authorizer [%s].", authorizerPrefix);
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
try {
UserAndRoleMap userAndRoleMap = objectMapper.readValue(
serializedUserAndRoleMap,
BasicAuthUtils.AUTHORIZER_USER_AND_ROLE_MAP_TYPE_REFERENCE
);
cachedUserMaps.put(authorizerPrefix, userAndRoleMap.getUserMap());
cachedRoleMaps.put(authorizerPrefix, userAndRoleMap.getRoleMap());
if (commonCacheConfig.getCacheDirectory() != null) {
writeUserMapToDisk(authorizerPrefix, serializedUserAndRoleMap);
}
}
catch (Exception e) {
LOG.makeAlert(e, "Could not deserialize user/role map received from coordinator").emit();
}
}
@Override
public void handleAuthorizerGroupMappingUpdate(String authorizerPrefix, byte[] serializedGroupMappingAndRoleMap)
{
LOG.debug("Received groupMappingMap cache update for authorizer [%s].", authorizerPrefix);
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
try {
GroupMappingAndRoleMap groupMappingAndRoleMap = objectMapper.readValue(
serializedGroupMappingAndRoleMap,
BasicAuthUtils.AUTHORIZER_GROUP_MAPPING_AND_ROLE_MAP_TYPE_REFERENCE
);
cachedGroupMappingMaps.put(authorizerPrefix, groupMappingAndRoleMap.getGroupMappingMap());
cachedGroupMappingRoleMaps.put(authorizerPrefix, groupMappingAndRoleMap.getRoleMap());
if (commonCacheConfig.getCacheDirectory() != null) {
writeGroupMappingMapToDisk(authorizerPrefix, serializedGroupMappingAndRoleMap);
}
}
catch (Exception e) {
LOG.makeAlert(e, "Could not deserialize groupMapping/role map received from coordinator.").emit();
}
}
@Override
public Map<String, BasicAuthorizerUser> getUserMap(String authorizerPrefix)
{
return cachedUserMaps.get(authorizerPrefix);
}
@Override
public Map<String, BasicAuthorizerRole> getRoleMap(String authorizerPrefix)
{
return cachedRoleMaps.get(authorizerPrefix);
}
@Override
public Map<String, BasicAuthorizerGroupMapping> getGroupMappingMap(String authorizerPrefix)
{
return cachedGroupMappingMaps.get(authorizerPrefix);
}
@Override
public Map<String, BasicAuthorizerRole> getGroupMappingRoleMap(String authorizerPrefix)
{
return cachedGroupMappingRoleMaps.get(authorizerPrefix);
}
private String getUserRoleMapFilename(String prefix)
{
return StringUtils.format("%s.authorizer.userRole.cache", prefix);
}
private String getGroupMappingRoleMapFilename(String prefix)
{
return StringUtils.format("%s.authorizer.groupMappingRole.cache", prefix);
}
@Nullable
private UserAndRoleMap loadUserAndRoleMapFromDisk(String prefix) throws IOException
{
File userAndRoleMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix));
if (!userAndRoleMapFile.exists()) {
return null;
}
return objectMapper.readValue(
userAndRoleMapFile,
BasicAuthUtils.AUTHORIZER_USER_AND_ROLE_MAP_TYPE_REFERENCE
);
}
@Nullable
private GroupMappingAndRoleMap loadGroupMappingAndRoleMapFromDisk(String prefix) throws IOException
{
File groupMappingAndRoleMapFile = new File(commonCacheConfig.getCacheDirectory(), getGroupMappingRoleMapFilename(prefix));
if (!groupMappingAndRoleMapFile.exists()) {
return null;
}
return objectMapper.readValue(
groupMappingAndRoleMapFile,
BasicAuthUtils.AUTHORIZER_GROUP_MAPPING_AND_ROLE_MAP_TYPE_REFERENCE
);
}
private void writeUserMapToDisk(String prefix, byte[] userMapBytes) throws IOException
{
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs();
File userMapFile = new File(commonCacheConfig.getCacheDirectory(), getUserRoleMapFilename(prefix));
FileUtils.writeAtomically(
userMapFile,
out -> {
out.write(userMapBytes);
return null;
}
);
}
private void writeGroupMappingMapToDisk(String prefix, byte[] groupMappingBytes) throws IOException
{
File cacheDir = new File(commonCacheConfig.getCacheDirectory());
cacheDir.mkdirs();
File groupMapFile = new File(commonCacheConfig.getCacheDirectory(), getGroupMappingRoleMapFilename(prefix));
FileUtils.writeAtomically(
groupMapFile,
out -> {
out.write(groupMappingBytes);
return null;
}
);
}
@Nullable
private UserAndRoleMap fetchUserAndRoleMapFromCoordinator(String prefix, boolean isInit)
{
try {
return RetryUtils.retry(
() -> {
return tryFetchUserMapsFromCoordinator(prefix);
},
e -> true,
commonCacheConfig.getMaxSyncRetries()
);
}
catch (Exception e) {
LOG.makeAlert(e, "Encountered exception while fetching user and role map for authorizer [%s]", prefix).emit();
if (isInit) {
if (commonCacheConfig.getCacheDirectory() != null) {
try {
LOG.info("Attempting to load user map snapshot from disk.");
return loadUserAndRoleMapFromDisk(prefix);
}
catch (Exception e2) {
e2.addSuppressed(e);
LOG.makeAlert(e2, "Encountered exception while loading user-role map snapshot for authorizer [%s]", prefix)
.emit();
}
}
}
return null;
}
}
@Nullable
private GroupMappingAndRoleMap fetchGroupAndRoleMapFromCoordinator(String prefix, boolean isInit)
{
try {
return RetryUtils.retry(
() -> {
return tryFetchGroupMappingMapsFromCoordinator(prefix);
},
e -> true,
commonCacheConfig.getMaxSyncRetries()
);
}
catch (Exception e) {
LOG.makeAlert(e, "Encountered exception while fetching group and role map for authorizer [%s]", prefix).emit();
if (isInit) {
if (commonCacheConfig.getCacheDirectory() != null) {
try {
LOG.info("Attempting to load group map snapshot from disk.");
return loadGroupMappingAndRoleMapFromDisk(prefix);
}
catch (Exception e2) {
e2.addSuppressed(e);
LOG.makeAlert(e2, "Encountered exception while loading group-role map snapshot for authorizer [%s]", prefix)
.emit();
}
}
}
return null;
}
}
private UserAndRoleMap tryFetchUserMapsFromCoordinator(
String prefix
) throws Exception
{
Request req = druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid-ext/basic-security/authorization/db/%s/cachedSerializedUserMap", prefix)
);
BytesFullResponseHolder responseHolder = druidLeaderClient.go(
req,
new BytesFullResponseHandler()
);
byte[] userRoleMapBytes = responseHolder.getContent();
UserAndRoleMap userAndRoleMap = objectMapper.readValue(
userRoleMapBytes,
BasicAuthUtils.AUTHORIZER_USER_AND_ROLE_MAP_TYPE_REFERENCE
);
if (userAndRoleMap != null && commonCacheConfig.getCacheDirectory() != null) {
writeUserMapToDisk(prefix, userRoleMapBytes);
}
return userAndRoleMap;
}
private GroupMappingAndRoleMap tryFetchGroupMappingMapsFromCoordinator(
String prefix
) throws Exception
{
Request req = druidLeaderClient.makeRequest(
HttpMethod.GET,
StringUtils.format("/druid-ext/basic-security/authorization/db/%s/cachedSerializedGroupMappingMap", prefix)
);
BytesFullResponseHolder responseHolder = druidLeaderClient.go(
req,
new BytesFullResponseHandler()
);
// cachedSerializedGroupMappingMap is a new endpoint introduced in Druid 0.17.0. For backwards compatibility, if we
// get a 404 from the coordinator we stop retrying. This can happen during a rolling upgrade when a process
// running 0.17.0+ tries to access this endpoint on an older coordinator.
if (responseHolder.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
LOG.warn("cachedSerializedGroupMappingMap is not available from the coordinator, skipping fetch of group mappings for now.");
return null;
}
byte[] groupRoleMapBytes = responseHolder.getContent();
GroupMappingAndRoleMap groupMappingAndRoleMap = objectMapper.readValue(
groupRoleMapBytes,
BasicAuthUtils.AUTHORIZER_GROUP_MAPPING_AND_ROLE_MAP_TYPE_REFERENCE
);
if (groupMappingAndRoleMap != null && commonCacheConfig.getCacheDirectory() != null) {
writeGroupMappingMapToDisk(prefix, groupRoleMapBytes);
}
return groupMappingAndRoleMap;
}
private void initUserMaps()
{
AuthorizerMapper authorizerMapper = injector.getInstance(AuthorizerMapper.class);
if (authorizerMapper == null || authorizerMapper.getAuthorizerMap() == null) {
return;
}
for (Map.Entry<String, Authorizer> entry : authorizerMapper.getAuthorizerMap().entrySet()) {
Authorizer authorizer = entry.getValue();
if (authorizer instanceof BasicRoleBasedAuthorizer) {
String authorizerName = entry.getKey();
authorizerPrefixes.add(authorizerName);
UserAndRoleMap userAndRoleMap = fetchUserAndRoleMapFromCoordinator(authorizerName, true);
if (userAndRoleMap != null) {
cachedUserMaps.put(authorizerName, userAndRoleMap.getUserMap());
cachedRoleMaps.put(authorizerName, userAndRoleMap.getRoleMap());
}
GroupMappingAndRoleMap groupMappingAndRoleMap = fetchGroupAndRoleMapFromCoordinator(authorizerName, true);
if (groupMappingAndRoleMap != null) {
cachedGroupMappingMaps.put(authorizerName, groupMappingAndRoleMap.getGroupMappingMap());
cachedGroupMappingRoleMaps.put(authorizerName, groupMappingAndRoleMap.getRoleMap());
}
}
}
}
}