blob: eeba4ac6a47e6861f550d980d40b6faeaddafdc7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.client.routing;
import com.google.common.collect.ImmutableSet;
import com.twitter.common.base.Command;
import com.twitter.common.base.Commands;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.finagle.Addr;
import com.twitter.finagle.Address;
import com.twitter.finagle.Name;
import com.twitter.finagle.Resolver$;
import com.twitter.thrift.Endpoint;
import com.twitter.thrift.ServiceInstance;
import com.twitter.thrift.Status;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
/**
* Finagle Name based {@link ServerSet} implementation.
*/
class NameServerSet implements ServerSet {
private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class);
private volatile Set<HostChangeMonitor<ServiceInstance>> watchers =
new HashSet<HostChangeMonitor<ServiceInstance>>();
private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of();
private AtomicBoolean resolutionPending = new AtomicBoolean(true);
public NameServerSet(String nameStr) {
Name name;
try {
name = Resolver$.MODULE$.eval(nameStr);
} catch (Exception exc) {
logger.error("Exception in Resolver.eval for name {}", nameStr, exc);
// Since this is called from various places that dont handle specific exceptions,
// we have no option than to throw a runtime exception to halt the control flow
// This should only happen in case of incorrect configuration. Having a log message
// would help identify the problem during tests
throw new RuntimeException(exc);
}
initialize(name);
}
public NameServerSet(Name name) {
initialize(name);
}
private void initialize(Name name) {
if (name instanceof TestName) {
((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() {
@Override
public BoxedUnit apply(Addr varAddr) {
return NameServerSet.this.respondToChanges(varAddr);
}
});
} else if (name instanceof Name.Bound) {
((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() {
@Override
public BoxedUnit apply(Addr varAddr) {
return NameServerSet.this.respondToChanges(varAddr);
}
});
} else {
logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}",
name, name.getClass());
throw new UnsupportedOperationException("NameServerSet only supports Name.Bound");
}
}
private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) {
if (endpointAddress instanceof Address.Inet) {
InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr();
Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort());
HashMap<String, Endpoint> map = new HashMap<String, Endpoint>();
map.put("thrift", endpoint);
return new ServiceInstance(
endpoint,
map,
Status.ALIVE);
} else {
logger.error("We expect InetSocketAddress while the resolved address {} was {}",
endpointAddress, endpointAddress.getClass());
throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress);
}
}
private BoxedUnit respondToChanges(Addr addr) {
ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet);
ImmutableSet<ServiceInstance> newHostSet = oldHostSet;
if (addr instanceof Addr.Bound) {
scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs();
scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator();
HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>();
while (endpointAddressesIterator.hasNext()) {
serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next()));
}
newHostSet = ImmutableSet.copyOf(serviceInstances);
} else if (addr instanceof Addr.Failed) {
logger.error("Name resolution failed", ((Addr.Failed) addr).cause());
newHostSet = ImmutableSet.of();
} else if (addr.toString().equals("Pending")) {
logger.info("Name resolution pending");
newHostSet = oldHostSet;
} else if (addr.toString().equals("Neg")) {
newHostSet = ImmutableSet.of();
} else {
logger.error("Invalid Addr type: {}", addr.getClass().getName());
throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName());
}
// Reference comparison is valid as the sets are immutable
if (oldHostSet != newHostSet) {
logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet));
resolutionPending.set(false);
hostSet = newHostSet;
synchronized (watchers) {
for (HostChangeMonitor<ServiceInstance> watcher: watchers) {
watcher.onChange(newHostSet);
}
}
}
return BoxedUnit.UNIT;
}
private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) {
StringBuilder result = new StringBuilder();
result.append("(");
for (ServiceInstance serviceInstance : hostSet) {
Endpoint endpoint = serviceInstance.getServiceEndpoint();
result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort()));
}
result.append(" )");
return result.toString();
}
/**
* Attempts to join a server set for this logical service group.
*
* @param endpoint the primary service endpoint
* @param additionalEndpoints and additional endpoints keyed by their logical name
* @param status the current service status
* @return an EndpointStatus object that allows the endpoint to adjust its status
* @throws Group.JoinException if there was a problem joining the server set
* @throws InterruptedException if interrupted while waiting to join the server set
* @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)}
*/
@Override
public EndpointStatus join(InetSocketAddress endpoint,
Map<String, InetSocketAddress> additionalEndpoints,
Status status)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("NameServerSet does not support join");
}
/**
* Attempts to join a server set for this logical service group.
*
* @param endpoint the primary service endpoint
* @param additionalEndpoints and additional endpoints keyed by their logical name
* @return an EndpointStatus object that allows the endpoint to adjust its status
* @throws Group.JoinException if there was a problem joining the server set
* @throws InterruptedException if interrupted while waiting to join the server set
*/
@Override
public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("NameServerSet does not support join");
}
/**
* Attempts to join a server set for this logical service group.
*
* @param endpoint the primary service endpoint
* @param additionalEndpoints and additional endpoints keyed by their logical name
* @param shardId Unique shard identifier for this member of the service.
* @return an EndpointStatus object that allows the endpoint to adjust its status
* @throws Group.JoinException if there was a problem joining the server set
* @throws InterruptedException if interrupted while waiting to join the server set
*/
@Override
public EndpointStatus join(InetSocketAddress endpoint,
Map<String, InetSocketAddress> additionalEndpoints,
int shardId)
throws Group.JoinException, InterruptedException {
throw new UnsupportedOperationException("NameServerSet does not support join");
}
/**
* Registers a monitor to receive change notices for this server set as long as this jvm process
* is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
* The monitor will be notified if the membership set or parameters of existing members have
* changed.
*
* @param monitor the server set monitor to call back when the host set changes
* @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
* @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)}
*/
@Deprecated
@Override
public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
throw new UnsupportedOperationException("NameServerSet does not support monitor");
}
/**
* Registers a monitor to receive change notices for this server set as long as this jvm process
* is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
* The monitor will be notified if the membership set or parameters of existing members have
* changed.
*
* @param monitor the server set monitor to call back when the host set changes
* @return A command which, when executed, will stop monitoring the host set.
* @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
*/
@Override
public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
// First add the monitor to the watchers so that it does not miss any changes and invoke
// the onChange method
synchronized (watchers) {
watchers.add(monitor);
}
if (resolutionPending.compareAndSet(false, false)) {
monitor.onChange(hostSet);
}
return Commands.NOOP; // Return value is not used
}
}