blob: 17b70be92467a17a9fccf67d9115ecef3c209069 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.client.proxy;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import org.apache.distributedlog.client.ClientConfig;
import org.apache.distributedlog.client.stats.ClientStats;
import org.apache.distributedlog.client.stats.OpStats;
import org.apache.distributedlog.thrift.service.ClientInfo;
import org.apache.distributedlog.thrift.service.ServerInfo;
import com.twitter.util.FutureEventListener;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manager manages clients (channels) to proxies.
*/
public class ProxyClientManager implements TimerTask {
private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
private final ClientConfig clientConfig;
private final ProxyClient.Builder clientBuilder;
private final HashedWheelTimer timer;
private final HostProvider hostProvider;
private volatile Timeout periodicHandshakeTask;
private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services =
new ConcurrentHashMap<SocketAddress, ProxyClient>();
private final CopyOnWriteArraySet<ProxyListener> proxyListeners =
new CopyOnWriteArraySet<ProxyListener>();
private volatile boolean closed = false;
private volatile boolean periodicHandshakeEnabled = true;
private final Stopwatch lastOwnershipSyncStopwatch;
private final OpStats handshakeStats;
public ProxyClientManager(ClientConfig clientConfig,
ProxyClient.Builder clientBuilder,
HashedWheelTimer timer,
HostProvider hostProvider,
ClientStats clientStats) {
this.clientConfig = clientConfig;
this.clientBuilder = clientBuilder;
this.timer = timer;
this.hostProvider = hostProvider;
this.handshakeStats = clientStats.getOpStats("handshake");
scheduleHandshake();
this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
}
private void scheduleHandshake() {
if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
periodicHandshakeTask = timer.newTimeout(this,
clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
}
}
void setPeriodicHandshakeEnabled(boolean enabled) {
this.periodicHandshakeEnabled = enabled;
}
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || closed) {
return;
}
if (periodicHandshakeEnabled) {
final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS)
>= clientConfig.getPeriodicOwnershipSyncIntervalMs();
final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size());
final AtomicInteger numStreams = new AtomicInteger(0);
final AtomicInteger numSuccesses = new AtomicInteger(0);
final AtomicInteger numFailures = new AtomicInteger(0);
final ConcurrentMap<SocketAddress, Integer> streamDistributions =
new ConcurrentHashMap<SocketAddress, Integer>();
final Stopwatch stopwatch = Stopwatch.createStarted();
for (SocketAddress host : hostsSnapshot) {
final SocketAddress address = host;
final ProxyClient client = getClient(address);
handshake(address, client, new FutureEventListener<ServerInfo>() {
@Override
public void onSuccess(ServerInfo serverInfo) {
numStreams.addAndGet(serverInfo.getOwnershipsSize());
numSuccesses.incrementAndGet();
notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch);
if (clientConfig.isHandshakeTracingEnabled()) {
streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize());
}
complete();
}
@Override
public void onFailure(Throwable cause) {
numFailures.incrementAndGet();
notifyHandshakeFailure(address, client, cause, stopwatch);
complete();
}
private void complete() {
if (0 == numHosts.decrementAndGet()) {
if (syncOwnerships) {
logger.info("Periodic handshaked with {} hosts : {} streams returned,"
+ " {} hosts succeeded, {} hosts failed",
new Object[] {
hostsSnapshot.size(),
numStreams.get(),
numSuccesses.get(),
numFailures.get()});
if (clientConfig.isHandshakeTracingEnabled()) {
logger.info("Periodic handshaked stream distribution : {}", streamDistributions);
}
}
}
}
}, false, syncOwnerships);
}
if (syncOwnerships) {
lastOwnershipSyncStopwatch.reset().start();
}
}
scheduleHandshake();
}
/**
* Register a proxy <code>listener</code> on proxy related changes.
*
* @param listener
* proxy listener
*/
public void registerProxyListener(ProxyListener listener) {
proxyListeners.add(listener);
}
private void notifyHandshakeSuccess(SocketAddress address,
ProxyClient client,
ServerInfo serverInfo,
boolean logging,
Stopwatch stopwatch) {
if (logging) {
if (null != serverInfo && serverInfo.isSetOwnerships()) {
logger.info("Handshaked with {} : {} ownerships returned.",
address, serverInfo.getOwnerships().size());
} else {
logger.info("Handshaked with {} : no ownerships returned", address);
}
}
handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
for (ProxyListener listener : proxyListeners) {
listener.onHandshakeSuccess(address, client, serverInfo);
}
}
private void notifyHandshakeFailure(SocketAddress address,
ProxyClient client,
Throwable cause,
Stopwatch stopwatch) {
handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
for (ProxyListener listener : proxyListeners) {
listener.onHandshakeFailure(address, client, cause);
}
}
/**
* Retrieve a client to proxy <code>address</code>.
*
* @param address
* proxy address
* @return proxy client
*/
public ProxyClient getClient(final SocketAddress address) {
ProxyClient sc = address2Services.get(address);
if (null != sc) {
return sc;
}
return createClient(address);
}
/**
* Remove the client to proxy <code>address</code>.
*
* @param address
* proxy address
*/
public void removeClient(SocketAddress address) {
ProxyClient sc = address2Services.remove(address);
if (null != sc) {
logger.info("Removed host {}.", address);
sc.close();
}
}
/**
* Remove the client <code>sc</code> to proxy <code>address</code>.
*
* @param address
* proxy address
* @param sc
* proxy client
*/
public void removeClient(SocketAddress address, ProxyClient sc) {
if (address2Services.remove(address, sc)) {
logger.info("Remove client {} to host {}.", sc, address);
sc.close();
}
}
/**
* Create a client to proxy <code>address</code>.
*
* @param address
* proxy address
* @return proxy client
*/
public ProxyClient createClient(final SocketAddress address) {
final ProxyClient sc = clientBuilder.build(address);
ProxyClient oldSC = address2Services.putIfAbsent(address, sc);
if (null != oldSC) {
sc.close();
return oldSC;
} else {
final Stopwatch stopwatch = Stopwatch.createStarted();
FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() {
@Override
public void onSuccess(ServerInfo serverInfo) {
notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch);
}
@Override
public void onFailure(Throwable cause) {
notifyHandshakeFailure(address, sc, cause, stopwatch);
}
};
// send a ping messaging after creating connections.
handshake(address, sc, listener, true, true);
return sc;
}
}
/**
* Handshake with a given proxy.
*
* @param address
* proxy address
* @param sc
* proxy client
* @param listener
* listener on handshake result
*/
private void handshake(SocketAddress address,
ProxyClient sc,
FutureEventListener<ServerInfo> listener,
boolean logging,
boolean getOwnerships) {
if (clientConfig.getHandshakeWithClientInfo()) {
ClientInfo clientInfo = new ClientInfo();
clientInfo.setGetOwnerships(getOwnerships);
clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex());
if (logging) {
logger.info("Handshaking with {} : {}", address, clientInfo);
}
sc.getService().handshakeWithClientInfo(clientInfo)
.addEventListener(listener);
} else {
if (logging) {
logger.info("Handshaking with {}", address);
}
sc.getService().handshake().addEventListener(listener);
}
}
/**
* Handshake with all proxies.
*
* <p>NOTE: this is a synchronous call.
*/
public void handshake() {
Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
logger.info("Handshaking with {} hosts.", hostsSnapshot.size());
final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size());
final Stopwatch stopwatch = Stopwatch.createStarted();
for (SocketAddress host: hostsSnapshot) {
final SocketAddress address = host;
final ProxyClient client = getClient(address);
handshake(address, client, new FutureEventListener<ServerInfo>() {
@Override
public void onSuccess(ServerInfo serverInfo) {
notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch);
latch.countDown();
}
@Override
public void onFailure(Throwable cause) {
notifyHandshakeFailure(address, client, cause, stopwatch);
latch.countDown();
}
}, true, true);
}
try {
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
logger.warn("Interrupted on handshaking with servers : ", e);
}
}
/**
* Return number of proxies managed by client manager.
*
* @return number of proxies managed by client manager.
*/
public int getNumProxies() {
return address2Services.size();
}
/**
* Return all clients.
*
* @return all clients.
*/
public Map<SocketAddress, ProxyClient> getAllClients() {
return ImmutableMap.copyOf(address2Services);
}
public void close() {
closed = true;
Timeout task = periodicHandshakeTask;
if (null != task) {
task.cancel();
}
for (ProxyClient sc : address2Services.values()) {
sc.close();
}
}
}