blob: 4fe8141d7bb3506edc878099ccf873f0dd9d9511 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.client.routing;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import org.apache.distributedlog.service.DLSocketAddress;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.stats.StatsReceiver;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}.
*/
class ServerSetRoutingService extends Thread implements RoutingService {
private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class);
static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
return new ServerSetRoutingServiceBuilder();
}
/**
* Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service.
*/
static class ServerSetRoutingServiceBuilder implements RoutingService.Builder {
private ServerSetWatcher serverSetWatcher;
private ServerSetRoutingServiceBuilder() {}
public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) {
this.serverSetWatcher = serverSetWatcher;
return this;
}
@Override
public Builder statsReceiver(StatsReceiver statsReceiver) {
return this;
}
@Override
public RoutingService build() {
checkNotNull(serverSetWatcher, "No serverset watcher provided.");
return new ServerSetRoutingService(this.serverSetWatcher);
}
}
private static class HostComparator implements Comparator<SocketAddress> {
private static final HostComparator INSTANCE = new HostComparator();
@Override
public int compare(SocketAddress o1, SocketAddress o2) {
return o1.toString().compareTo(o2.toString());
}
}
private final ServerSetWatcher serverSetWatcher;
private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>();
private List<SocketAddress> hostList = new ArrayList<SocketAddress>();
private final HashFunction hasher = Hashing.md5();
// Server Set Changes
private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange =
new AtomicReference<ImmutableSet<DLSocketAddress>>(null);
private final CountDownLatch changeLatch = new CountDownLatch(1);
// Listeners
protected final CopyOnWriteArraySet<RoutingListener> listeners =
new CopyOnWriteArraySet<RoutingListener>();
ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
super("ServerSetRoutingService");
this.serverSetWatcher = serverSetWatcher;
}
@Override
public Set<SocketAddress> getHosts() {
synchronized (hostSet) {
return ImmutableSet.copyOf(hostSet);
}
}
@Override
public void startService() {
start();
try {
if (!changeLatch.await(1, TimeUnit.MINUTES)) {
logger.warn("No serverset change received in 1 minute.");
}
} catch (InterruptedException e) {
logger.warn("Interrupted waiting first serverset change : ", e);
}
logger.info("{} Routing Service Started.", getClass().getSimpleName());
}
@Override
public void stopService() {
Thread.currentThread().interrupt();
try {
join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted on waiting serverset routing service to finish : ", e);
}
logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
}
@Override
public RoutingService registerListener(RoutingListener listener) {
listeners.add(listener);
return this;
}
@Override
public RoutingService unregisterListener(RoutingListener listener) {
listeners.remove(listener);
return this;
}
@Override
public SocketAddress getHost(String key, RoutingContext rContext)
throws NoBrokersAvailableException {
SocketAddress address = null;
synchronized (hostSet) {
if (0 != hostList.size()) {
int hashCode = hasher.hashUnencodedChars(key).asInt();
int hostId = signSafeMod(hashCode, hostList.size());
address = hostList.get(hostId);
if (rContext.isTriedHost(address)) {
ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList);
newList.remove(hostId);
// pickup a new host by rehashing it.
hostId = signSafeMod(hashCode, newList.size());
address = newList.get(hostId);
int i = hostId;
while (rContext.isTriedHost(address)) {
i = (i + 1) % newList.size();
if (i == hostId) {
address = null;
break;
}
address = newList.get(i);
}
}
}
}
if (null == address) {
throw new NoBrokersAvailableException("No host is available.");
}
return address;
}
@Override
public void removeHost(SocketAddress host, Throwable reason) {
synchronized (hostSet) {
if (hostSet.remove(host)) {
logger.info("Node {} left due to : ", host, reason);
}
hostList = new ArrayList<SocketAddress>(hostSet);
Collections.sort(hostList, HostComparator.INSTANCE);
logger.info("Host list becomes : {}.", hostList);
}
}
@Override
public void run() {
try {
serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() {
@Override
public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) {
ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances);
if (null == lastValue) {
ImmutableSet<DLSocketAddress> mostRecentValue;
do {
mostRecentValue = serverSetChange.get();
performServerSetChange(mostRecentValue);
changeLatch.countDown();
} while (!serverSetChange.compareAndSet(mostRecentValue, null));
}
}
});
} catch (Exception e) {
logger.error("Fail to monitor server set : ", e);
Runtime.getRuntime().exit(-1);
}
}
protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) {
Set<SocketAddress> newSet = new HashSet<SocketAddress>();
for (DLSocketAddress serviceInstance : serverSet) {
newSet.add(serviceInstance.getSocketAddress());
}
Set<SocketAddress> removed;
Set<SocketAddress> added;
synchronized (hostSet) {
removed = Sets.difference(hostSet, newSet).immutableCopy();
added = Sets.difference(newSet, hostSet).immutableCopy();
for (SocketAddress node: removed) {
if (hostSet.remove(node)) {
logger.info("Node {} left.", node);
}
}
for (SocketAddress node: added) {
if (hostSet.add(node)) {
logger.info("Node {} joined.", node);
}
}
}
for (SocketAddress addr : removed) {
for (RoutingListener listener : listeners) {
listener.onServerLeft(addr);
}
}
for (SocketAddress addr : added) {
for (RoutingListener listener : listeners) {
listener.onServerJoin(addr);
}
}
synchronized (hostSet) {
hostList = new ArrayList<SocketAddress>(hostSet);
Collections.sort(hostList, HostComparator.INSTANCE);
logger.info("Host list becomes : {}.", hostList);
}
}
static int signSafeMod(long dividend, int divisor) {
int mod = (int) (dividend % divisor);
if (mod < 0) {
mod += divisor;
}
return mod;
}
}