blob: 0a8f28b0ba992013217d2882b1f712235a980c1a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.impl.federated;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.callback.NamespaceListener;
import com.twitter.distributedlog.exceptions.LogExistsException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.ZKNamespaceWatcher;
import com.twitter.distributedlog.metadata.LogMetadataStore;
import com.twitter.distributedlog.namespace.NamespaceWatcher;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Charsets.UTF_8;
/**
* A Federated ZooKeeper Based Log Metadata Store.
*
* To Upgrade a simple ZKLogMetadataStore to FederatedZKLogMetadataStore, following steps should be taken in sequence:
* a) deploy the new code with disabling createStreamsIfNotExists in all writer.
* b) once all proxies disable the flag, update namespace binding to enable federated namespace.
* c) restart writers to take federated namespace in place.
*
* NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads
* that have lots of creations or deletions.
*/
public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable,
FutureEventListener<Set<URI>> {
static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);
private final static String ZNODE_SUB_NAMESPACES = ".subnamespaces";
private final static String SUB_NAMESPACE_PREFIX = "NS_";
/**
* Create the federated namespace.
*
* @param namespace
* namespace to create
* @param zkc
* zookeeper client
* @throws InterruptedException
* @throws ZooKeeperClient.ZooKeeperConnectionException
* @throws KeeperException
*/
public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc)
throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0],
zkc.getDefaultACL(), CreateMode.PERSISTENT);
}
/**
* Represent a sub namespace inside the federated namespace.
*/
class SubNamespace implements NamespaceListener {
final URI uri;
final ZKNamespaceWatcher watcher;
Promise<Set<String>> logsFuture = new Promise<Set<String>>();
SubNamespace(URI uri) {
this.uri = uri;
this.watcher = new ZKNamespaceWatcher(conf, uri, zkc, scheduler);
this.watcher.registerListener(this);
}
void watch() {
this.watcher.watchNamespaceChanges();
}
synchronized Future<Set<String>> getLogs() {
return logsFuture;
}
@Override
public void onStreamsChanged(Iterator<String> newLogsIter) {
Set<String> newLogs = Sets.newHashSet(newLogsIter);
Set<String> oldLogs = Sets.newHashSet();
// update the sub namespace cache
Promise<Set<String>> newLogsPromise;
synchronized (this) {
if (logsFuture.isDefined()) { // the promise is already satisfied
try {
oldLogs = FutureUtils.result(logsFuture);
} catch (IOException e) {
logger.error("Unexpected exception when getting logs from a satisified future of {} : ",
uri, e);
}
logsFuture = new Promise<Set<String>>();
}
// update the reverse cache
for (String logName : newLogs) {
URI oldURI = log2Locations.putIfAbsent(logName, uri);
if (null != oldURI && !Objects.equal(uri, oldURI)) {
logger.error("Log {} is found duplicated in multiple locations : old location = {}," +
" new location = {}", new Object[] { logName, oldURI, uri });
duplicatedLogFound.set(true);
}
}
// remove the gone streams
Set<String> deletedLogs = Sets.difference(oldLogs, newLogs);
for (String logName : deletedLogs) {
log2Locations.remove(logName, uri);
}
newLogsPromise = logsFuture;
}
newLogsPromise.setValue(newLogs);
// notify namespace changes
notifyOnNamespaceChanges();
}
}
final DistributedLogConfiguration conf;
final URI namespace;
final ZooKeeperClient zkc;
final OrderedScheduler scheduler;
final String zkSubnamespacesPath;
final AtomicBoolean duplicatedLogFound = new AtomicBoolean(false);
final AtomicReference<String> duplicatedLogName = new AtomicReference<String>(null);
final AtomicReference<Integer> zkSubnamespacesVersion = new AtomicReference<Integer>(null);
final int maxLogsPerSubnamespace;
// sub namespaces
final ConcurrentSkipListMap<URI, SubNamespace> subNamespaces;
// map between log name and its location
final ConcurrentMap<String, URI> log2Locations;
// final
final boolean forceCheckLogExistence;
public FederatedZKLogMetadataStore(
DistributedLogConfiguration conf,
URI namespace,
ZooKeeperClient zkc,
OrderedScheduler scheduler) throws IOException {
this.conf = conf;
this.namespace = namespace;
this.zkc = zkc;
this.scheduler = scheduler;
this.forceCheckLogExistence = conf.getFederatedCheckExistenceWhenCacheMiss();
this.subNamespaces = new ConcurrentSkipListMap<URI, SubNamespace>();
this.log2Locations = new ConcurrentHashMap<String, URI>();
this.zkSubnamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace();
// fetch the sub namespace
Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
for (URI uri : uris) {
SubNamespace subNs = new SubNamespace(uri);
if (null == subNamespaces.putIfAbsent(uri, subNs)) {
subNs.watch();
logger.info("Watched sub namespace {}", uri);
}
}
logger.info("Federated ZK LogMetadataStore is initialized for {}", namespace);
}
private void scheduleTask(Runnable r, long ms) {
if (duplicatedLogFound.get()) {
logger.error("Scheduler is halted for federated namespace {} as duplicated log found",
namespace);
return;
}
try {
scheduler.schedule(r, ms, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ree) {
logger.error("Task {} scheduled in {} ms is rejected : ", new Object[]{r, ms, ree});
}
}
private <T> Future<T> postStateCheck(Future<T> future) {
final Promise<T> postCheckedPromise = new Promise<T>();
future.addEventListener(new FutureEventListener<T>() {
@Override
public void onSuccess(T value) {
if (duplicatedLogFound.get()) {
postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace));
} else {
postCheckedPromise.setValue(value);
}
}
@Override
public void onFailure(Throwable cause) {
postCheckedPromise.setException(cause);
}
});
return postCheckedPromise;
}
//
// SubNamespace Related Methods
//
@VisibleForTesting
Set<URI> getSubnamespaces() {
return subNamespaces.keySet();
}
@VisibleForTesting
void removeLogFromCache(String logName) {
log2Locations.remove(logName);
}
private URI getSubNamespaceURI(String ns) throws URISyntaxException {
return new URI(
namespace.getScheme(),
namespace.getUserInfo(),
namespace.getHost(),
namespace.getPort(),
namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + ns,
namespace.getQuery(),
namespace.getFragment());
}
Future<Set<URI>> getCachedSubNamespaces() {
Set<URI> nsSet = subNamespaces.keySet();
return Future.value(nsSet);
}
Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
final Promise<Set<URI>> promise = new Promise<Set<URI>>();
try {
zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (Code.OK.intValue() == rc) {
fetchSubNamespaces(watcher, promise);
} else {
promise.setException(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
} catch (InterruptedException e) {
promise.setException(e);
}
return promise;
}
private void fetchSubNamespaces(Watcher watcher,
final Promise<Set<URI>> promise) {
try {
zkc.get().getChildren(this.zkSubnamespacesPath, watcher,
new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (Code.NONODE.intValue() == rc) {
promise.setException(new UnexpectedException(
"The subnamespaces don't exist for the federated namespace " + namespace));
} else if (Code.OK.intValue() == rc) {
Set<URI> subnamespaces = Sets.newHashSet();
subnamespaces.add(namespace);
try {
for (String ns : children) {
subnamespaces.add(getSubNamespaceURI(ns));
}
} catch (URISyntaxException use) {
logger.error("Invalid sub namespace uri found : ", use);
promise.setException(new UnexpectedException(
"Invalid sub namespace uri found in " + namespace, use));
return;
}
// update the sub namespaces set before update version
setZkSubnamespacesVersion(stat.getVersion());
promise.setValue(subnamespaces);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
} catch (InterruptedException e) {
promise.setException(e);
}
}
@Override
public void run() {
fetchSubNamespaces(this).addEventListener(this);
}
@Override
public void onSuccess(Set<URI> uris) {
for (URI uri : uris) {
if (subNamespaces.containsKey(uri)) {
continue;
}
SubNamespace subNs = new SubNamespace(uri);
if (null == subNamespaces.putIfAbsent(uri, subNs)) {
subNs.watch();
logger.info("Watched new sub namespace {}.", uri);
notifyOnNamespaceChanges();
}
}
}
@Override
public void onFailure(Throwable cause) {
// failed to fetch namespaces, retry later
scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
}
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.EventType.None == watchedEvent.getType() &&
Event.KeeperState.Expired == watchedEvent.getState()) {
scheduleTask(this, conf.getZKSessionTimeoutMilliseconds());
return;
}
if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
// fetch the namespace
fetchSubNamespaces(this).addEventListener(this);
}
}
//
// Log Related Methods
//
private <A> Future<A> duplicatedLogException(String logName) {
return Future.exception(new UnexpectedException("Duplicated log " + logName
+ " found in namespace " + namespace));
}
@Override
public Future<URI> createLog(final String logName) {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
Promise<URI> createPromise = new Promise<URI>();
doCreateLog(logName, createPromise);
return postStateCheck(createPromise);
}
void doCreateLog(final String logName, final Promise<URI> createPromise) {
getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() {
@Override
public void onSuccess(Optional<URI> uriOptional) {
if (uriOptional.isPresent()) {
createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
} else {
getCachedSubNamespacesAndCreateLog(logName, createPromise);
}
}
@Override
public void onFailure(Throwable cause) {
createPromise.setException(cause);
}
});
}
private void getCachedSubNamespacesAndCreateLog(final String logName,
final Promise<URI> createPromise) {
getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() {
@Override
public void onSuccess(Set<URI> uris) {
findSubNamespaceToCreateLog(logName, uris, createPromise);
}
@Override
public void onFailure(Throwable cause) {
createPromise.setException(cause);
}
});
}
private void fetchSubNamespacesAndCreateLog(final String logName,
final Promise<URI> createPromise) {
fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() {
@Override
public void onSuccess(Set<URI> uris) {
findSubNamespaceToCreateLog(logName, uris, createPromise);
}
@Override
public void onFailure(Throwable cause) {
createPromise.setException(cause);
}
});
}
private void findSubNamespaceToCreateLog(final String logName,
final Set<URI> uris,
final Promise<URI> createPromise) {
final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size());
List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
for (URI uri : uris) {
SubNamespace subNs = subNamespaces.get(uri);
if (null == subNs) {
createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found"));
return;
}
futureList.add(subNs.getLogs());
uriList.add(uri);
}
Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
@Override
public void onSuccess(List<Set<String>> resultList) {
for (int i = resultList.size() - 1; i >= 0; i--) {
Set<String> logs = resultList.get(i);
if (logs.size() < maxLogsPerSubnamespace) {
URI uri = uriList.get(i);
createLogInNamespace(uri, logName, createPromise);
return;
}
}
// All sub namespaces are full
createSubNamespace().addEventListener(new FutureEventListener<URI>() {
@Override
public void onSuccess(URI uri) {
// the new namespace will be propagated to the namespace cache by the namespace listener
// so we don't need to cache it here. we could go ahead to create the stream under this
// namespace, as we are using sequential znode. we are mostly the first guy who create
// the log under this namespace.
createLogInNamespace(uri, logName, createPromise);
}
@Override
public void onFailure(Throwable cause) {
createPromise.setException(cause);
}
});
}
@Override
public void onFailure(Throwable cause) {
createPromise.setException(cause);
}
});
}
private String getNamespaceFromZkPath(String zkPath) throws UnexpectedException {
String parts[] = zkPath.split(SUB_NAMESPACE_PREFIX);
if (parts.length <= 0) {
throw new UnexpectedException("Invalid namespace @ " + zkPath);
}
return SUB_NAMESPACE_PREFIX + parts[parts.length - 1];
}
Future<URI> createSubNamespace() {
final Promise<URI> promise = new Promise<URI>();
final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX;
try {
zkc.get().create(nsPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT_SEQUENTIAL,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (Code.OK.intValue() == rc) {
try {
URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name));
logger.info("Created sub namespace {}", newUri);
promise.setValue(newUri);
} catch (UnexpectedException ue) {
promise.setException(ue);
} catch (URISyntaxException e) {
promise.setException(new UnexpectedException("Invalid namespace " + name + " is created."));
}
} else {
promise.setException(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
} catch (InterruptedException e) {
promise.setException(e);
}
return promise;
}
/**
* Create a log under the namespace. To guarantee there is only one creation happens at time
* in a federated namespace, we use CAS operation in zookeeper.
*
* @param uri
* namespace
* @param logName
* name of the log
* @param createPromise
* the promise representing the creation result.
*/
private void createLogInNamespace(final URI uri,
final String logName,
final Promise<URI> createPromise) {
// TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi
scheduler.submit(new Runnable() {
@Override
public void run() {
try {
createLogInNamespaceSync(uri, logName);
createPromise.setValue(uri);
} catch (InterruptedException e) {
createPromise.setException(e);
} catch (IOException e) {
createPromise.setException(e);
} catch (KeeperException.BadVersionException bve) {
fetchSubNamespacesAndCreateLog(logName, createPromise);
} catch (KeeperException e) {
createPromise.setException(e);
}
}
});
}
void createLogInNamespaceSync(URI uri, String logName)
throws InterruptedException, IOException, KeeperException {
Transaction txn = zkc.get().transaction();
// we don't have the zk version yet. set it to 0 instead of -1, to prevent non CAS operation.
int zkVersion = null == zkSubnamespacesVersion.get() ? 0 : zkSubnamespacesVersion.get();
txn.setData(zkSubnamespacesPath, uri.getPath().getBytes(UTF_8), zkVersion);
String logPath = uri.getPath() + "/" + logName;
txn.create(logPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT);
try {
txn.commit();
// if the transaction succeed, the zk version is advanced
setZkSubnamespacesVersion(zkVersion + 1);
} catch (KeeperException ke) {
List<OpResult> opResults = ke.getResults();
OpResult createResult = opResults.get(1);
if (createResult instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) createResult;
if (Code.NODEEXISTS.intValue() == errorResult.getErr()) {
throw new LogExistsException("Log " + logName + " already exists");
}
}
OpResult setResult = opResults.get(0);
if (setResult instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) setResult;
if (Code.BADVERSION.intValue() == errorResult.getErr()) {
throw KeeperException.create(Code.BADVERSION);
}
}
throw new ZKException("ZK exception in creating log " + logName + " in " + uri, ke);
}
}
void setZkSubnamespacesVersion(int zkVersion) {
Integer oldVersion;
boolean done = false;
while (!done) {
oldVersion = zkSubnamespacesVersion.get();
if (null == oldVersion) {
done = zkSubnamespacesVersion.compareAndSet(null, zkVersion);
continue;
}
if (oldVersion < zkVersion) {
done = zkSubnamespacesVersion.compareAndSet(oldVersion, zkVersion);
continue;
} else {
done = true;
}
}
}
@Override
public Future<Optional<URI>> getLogLocation(final String logName) {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
URI location = log2Locations.get(logName);
if (null != location) {
return postStateCheck(Future.value(Optional.of(location)));
}
if (!forceCheckLogExistence) {
Optional<URI> result = Optional.absent();
return Future.value(result);
}
return postStateCheck(fetchLogLocation(logName).onSuccess(
new AbstractFunction1<Optional<URI>, BoxedUnit>() {
@Override
public BoxedUnit apply(Optional<URI> uriOptional) {
if (uriOptional.isPresent()) {
log2Locations.putIfAbsent(logName, uriOptional.get());
}
return BoxedUnit.UNIT;
}
}));
}
private Future<Optional<URI>> fetchLogLocation(final String logName) {
final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
Set<URI> uris = subNamespaces.keySet();
List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
for (URI uri : uris) {
fetchFutures.add(fetchLogLocation(uri, logName));
}
Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
@Override
public void onSuccess(List<Optional<URI>> fetchResults) {
Optional<URI> result = Optional.absent();
for (Optional<URI> fetchResult : fetchResults) {
if (result.isPresent()) {
if (fetchResult.isPresent()) {
logger.error("Log {} is found in multiple sub namespaces : {} & {}.",
new Object[] { logName, result.get(), fetchResult.get() });
duplicatedLogName.compareAndSet(null, logName);
duplicatedLogFound.set(true);
fetchPromise.setException(new UnexpectedException("Log " + logName
+ " is found in multiple sub namespaces : "
+ result.get() + " & " + fetchResult.get()));
return;
}
} else {
result = fetchResult;
}
}
fetchPromise.setValue(result);
}
@Override
public void onFailure(Throwable cause) {
fetchPromise.setException(cause);
}
});
return fetchPromise;
}
private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
final String logRootPath = uri.getPath() + "/" + logName;
try {
zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (Code.OK.intValue() == rc) {
fetchPromise.setValue(Optional.of(uri));
} else if (Code.NONODE.intValue() == rc) {
fetchPromise.setValue(Optional.<URI>absent());
} else {
fetchPromise.setException(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
fetchPromise.setException(e);
} catch (InterruptedException e) {
fetchPromise.setException(e);
}
return fetchPromise;
}
@Override
public Future<Iterator<String>> getLogs() {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
return postStateCheck(retrieveLogs().map(
new AbstractFunction1<List<Set<String>>, Iterator<String>>() {
@Override
public Iterator<String> apply(List<Set<String>> resultList) {
return getIterator(resultList);
}
}));
}
private Future<List<Set<String>>> retrieveLogs() {
Collection<SubNamespace> subNss = subNamespaces.values();
List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
for (SubNamespace subNs : subNss) {
logsList.add(subNs.getLogs());
}
return Future.collect(logsList);
}
private Iterator<String> getIterator(List<Set<String>> resultList) {
List<Iterator<String>> iterList = Lists.newArrayListWithExpectedSize(resultList.size());
for (Set<String> result : resultList) {
iterList.add(result.iterator());
}
return Iterators.concat(iterList.iterator());
}
@Override
public void registerNamespaceListener(NamespaceListener listener) {
registerListener(listener);
}
@Override
protected void watchNamespaceChanges() {
// as the federated namespace already started watching namespace changes,
// we don't need to do any actions here
}
private void notifyOnNamespaceChanges() {
retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() {
@Override
public BoxedUnit apply(List<Set<String>> resultList) {
for (NamespaceListener listener : listeners) {
listener.onStreamsChanged(getIterator(resultList));
}
return BoxedUnit.UNIT;
}
});
}
}