blob: 9c89b4a3b4b9dec122a3b61ea159299be12ad009 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.acl;
import com.google.common.collect.Sets;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.thrift.AccessControlEntry;
import com.twitter.util.Await;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ZooKeeper Based {@link com.twitter.distributedlog.acl.AccessControlManager}
*/
public class ZKAccessControlManager implements AccessControlManager, Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKAccessControlManager.class);
private static final int ZK_RETRY_BACKOFF_MS = 500;
protected final DistributedLogConfiguration conf;
protected final ZooKeeperClient zkc;
protected final String zkRootPath;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ConcurrentMap<String, ZKAccessControl> streamEntries;
protected ZKAccessControl defaultAccessControl;
protected volatile boolean closed = false;
public ZKAccessControlManager(DistributedLogConfiguration conf,
ZooKeeperClient zkc,
String zkRootPath,
ScheduledExecutorService scheduledExecutorService) throws IOException {
this.conf = conf;
this.zkc = zkc;
this.zkRootPath = zkRootPath;
this.scheduledExecutorService = scheduledExecutorService;
this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
try {
Await.result(fetchDefaultAccessControlEntry());
} catch (Throwable t) {
if (t instanceof InterruptedException) {
throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
} else if (t instanceof KeeperException) {
throw new IOException("Encountered zookeeper exception on getting default access control entry for " + zkRootPath, t);
} else if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
}
}
try {
Await.result(fetchAccessControlEntries());
} catch (Throwable t) {
if (t instanceof InterruptedException) {
throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
} else if (t instanceof KeeperException) {
throw new IOException("Encountered zookeeper exception on getting access control entries for " + zkRootPath, t);
} else if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException("Encountered unknown exception on getting access control entries for " + zkRootPath, t);
}
}
}
protected AccessControlEntry getAccessControlEntry(String stream) {
ZKAccessControl entry = streamEntries.get(stream);
entry = null == entry ? defaultAccessControl : entry;
return entry.getAccessControlEntry();
}
@Override
public boolean allowWrite(String stream) {
return !getAccessControlEntry(stream).isDenyWrite();
}
@Override
public boolean allowTruncate(String stream) {
return !getAccessControlEntry(stream).isDenyTruncate();
}
@Override
public boolean allowDelete(String stream) {
return !getAccessControlEntry(stream).isDenyDelete();
}
@Override
public boolean allowAcquire(String stream) {
return !getAccessControlEntry(stream).isDenyAcquire();
}
@Override
public boolean allowRelease(String stream) {
return !getAccessControlEntry(stream).isDenyRelease();
}
@Override
public void close() {
closed = true;
}
private Future<Void> fetchAccessControlEntries() {
final Promise<Void> promise = new Promise<Void>();
fetchAccessControlEntries(promise);
return promise;
}
private void fetchAccessControlEntries(final Promise<Void> promise) {
try {
zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
return;
}
Set<String> streamsReceived = new HashSet<String>();
streamsReceived.addAll(children);
Set<String> streamsCached = streamEntries.keySet();
Set<String> streamsRemoved = Sets.difference(streamsCached, streamsReceived).immutableCopy();
for (String s : streamsRemoved) {
ZKAccessControl accessControl = streamEntries.remove(s);
if (null != accessControl) {
logger.info("Removed Access Control Entry for stream {} : {}", s, accessControl.getAccessControlEntry());
}
}
if (streamsReceived.isEmpty()) {
promise.setValue(null);
return;
}
final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
final AtomicInteger numFailures = new AtomicInteger(0);
for (String s : streamsReceived) {
final String streamName = s;
ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null)
.addEventListener(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl accessControl) {
streamEntries.put(streamName, accessControl);
logger.info("Added overrided access control for stream {} : {}", streamName, accessControl.getAccessControlEntry());
complete();
}
@Override
public void onFailure(Throwable cause) {
if (cause instanceof KeeperException.NoNodeException) {
streamEntries.remove(streamName);
} else if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
logger.warn("Access control is corrupted for stream {} @ {}, skipped it ...",
new Object[] { streamName, zkRootPath, cause });
streamEntries.remove(streamName);
} else {
if (1 == numFailures.incrementAndGet()) {
promise.setException(cause);
}
}
complete();
}
private void complete() {
if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
promise.setValue(null);
}
}
});
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
} catch (InterruptedException e) {
promise.setException(e);
}
}
private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
fetchDefaultAccessControlEntry(promise);
return promise;
}
private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) {
ZKAccessControl.read(zkc, zkRootPath, this)
.addEventListener(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl accessControl) {
logger.info("Default Access Control will be changed from {} to {}",
ZKAccessControlManager.this.defaultAccessControl,
accessControl);
ZKAccessControlManager.this.defaultAccessControl = accessControl;
promise.setValue(accessControl);
}
@Override
public void onFailure(Throwable cause) {
if (cause instanceof KeeperException.NoNodeException) {
logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath);
createDefaultAccessControlEntryIfNeeded(promise);
} else {
promise.setException(cause);
}
}
});
}
private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
return;
} catch (InterruptedException e) {
promise.setException(e);
return;
}
ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(),
CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (KeeperException.Code.OK.intValue() == rc) {
logger.info("Created zk path {} for default ACL.", zkRootPath);
fetchDefaultAccessControlEntry(promise);
} else {
promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
}
private void refetchDefaultAccessControlEntry(final int delayMs) {
if (closed) {
return;
}
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl value) {
// no-op
}
@Override
public void onFailure(Throwable cause) {
if (cause instanceof ZKAccessControl.CorruptedAccessControlException) {
logger.warn("Default access control entry is corrupted, ignore this update : ", cause);
return;
}
logger.warn("Encountered an error on refetching default access control entry, retrying in {} ms : ",
ZK_RETRY_BACKOFF_MS, cause);
refetchDefaultAccessControlEntry(ZK_RETRY_BACKOFF_MS);
}
});
}
}, delayMs, TimeUnit.MILLISECONDS);
}
private void refetchAccessControlEntries(final int delayMs) {
if (closed) {
return;
}
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// no-op
}
@Override
public void onFailure(Throwable cause) {
logger.warn("Encountered an error on refetching access control entries, retrying in {} ms : ",
ZK_RETRY_BACKOFF_MS, cause);
refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
}
});
}
}, delayMs, TimeUnit.MILLISECONDS);
}
private void refetchAllAccessControlEntries(final int delayMs) {
if (closed) {
return;
}
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl value) {
fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// no-op
}
@Override
public void onFailure(Throwable cause) {
logger.warn("Encountered an error on fetching all access control entries, retrying in {} ms : ",
ZK_RETRY_BACKOFF_MS, cause);
refetchAccessControlEntries(ZK_RETRY_BACKOFF_MS);
}
});
}
@Override
public void onFailure(Throwable cause) {
logger.warn("Encountered an error on refetching all access control entries, retrying in {} ms : ",
ZK_RETRY_BACKOFF_MS, cause);
refetchAllAccessControlEntries(ZK_RETRY_BACKOFF_MS);
}
});
}
}, delayMs, TimeUnit.MILLISECONDS);
}
@Override
public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
if (event.getState() == Event.KeeperState.Expired) {
refetchAllAccessControlEntries(0);
}
} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
logger.info("Default ACL for {} is changed, refetching ...", zkRootPath);
refetchDefaultAccessControlEntry(0);
} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {
logger.info("List of ACLs for {} are changed, refetching ...", zkRootPath);
refetchAccessControlEntries(0);
}
}
}