| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog.client.routing; |
| |
| import com.google.common.collect.ImmutableSet; |
| import com.twitter.common.base.Command; |
| import com.twitter.common.base.Commands; |
| import com.twitter.common.zookeeper.Group; |
| import com.twitter.common.zookeeper.ServerSet; |
| import com.twitter.finagle.Addr; |
| import com.twitter.finagle.Address; |
| import com.twitter.finagle.Name; |
| import com.twitter.finagle.Resolver$; |
| import com.twitter.thrift.Endpoint; |
| import com.twitter.thrift.ServiceInstance; |
| import com.twitter.thrift.Status; |
| import java.net.InetSocketAddress; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import scala.runtime.AbstractFunction1; |
| import scala.runtime.BoxedUnit; |
| |
| /** |
| * Finagle Name based {@link ServerSet} implementation. |
| */ |
| class NameServerSet implements ServerSet { |
| |
| private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class); |
| |
| private volatile Set<HostChangeMonitor<ServiceInstance>> watchers = |
| new HashSet<HostChangeMonitor<ServiceInstance>>(); |
| private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of(); |
| private AtomicBoolean resolutionPending = new AtomicBoolean(true); |
| |
| public NameServerSet(String nameStr) { |
| Name name; |
| try { |
| name = Resolver$.MODULE$.eval(nameStr); |
| } catch (Exception exc) { |
| logger.error("Exception in Resolver.eval for name {}", nameStr, exc); |
| // Since this is called from various places that dont handle specific exceptions, |
| // we have no option than to throw a runtime exception to halt the control flow |
| // This should only happen in case of incorrect configuration. Having a log message |
| // would help identify the problem during tests |
| throw new RuntimeException(exc); |
| } |
| initialize(name); |
| } |
| |
| public NameServerSet(Name name) { |
| initialize(name); |
| } |
| |
| private void initialize(Name name) { |
| if (name instanceof TestName) { |
| ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() { |
| @Override |
| public BoxedUnit apply(Addr varAddr) { |
| return NameServerSet.this.respondToChanges(varAddr); |
| } |
| }); |
| } else if (name instanceof Name.Bound) { |
| ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() { |
| @Override |
| public BoxedUnit apply(Addr varAddr) { |
| return NameServerSet.this.respondToChanges(varAddr); |
| } |
| }); |
| } else { |
| logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}", |
| name, name.getClass()); |
| throw new UnsupportedOperationException("NameServerSet only supports Name.Bound"); |
| } |
| } |
| |
| private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) { |
| if (endpointAddress instanceof Address.Inet) { |
| InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr(); |
| Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort()); |
| HashMap<String, Endpoint> map = new HashMap<String, Endpoint>(); |
| map.put("thrift", endpoint); |
| return new ServiceInstance( |
| endpoint, |
| map, |
| Status.ALIVE); |
| } else { |
| logger.error("We expect InetSocketAddress while the resolved address {} was {}", |
| endpointAddress, endpointAddress.getClass()); |
| throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress); |
| } |
| } |
| |
| |
| private BoxedUnit respondToChanges(Addr addr) { |
| ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet); |
| |
| ImmutableSet<ServiceInstance> newHostSet = oldHostSet; |
| |
| if (addr instanceof Addr.Bound) { |
| scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs(); |
| scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator(); |
| HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>(); |
| while (endpointAddressesIterator.hasNext()) { |
| serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next())); |
| } |
| newHostSet = ImmutableSet.copyOf(serviceInstances); |
| |
| } else if (addr instanceof Addr.Failed) { |
| logger.error("Name resolution failed", ((Addr.Failed) addr).cause()); |
| newHostSet = ImmutableSet.of(); |
| } else if (addr.toString().equals("Pending")) { |
| logger.info("Name resolution pending"); |
| newHostSet = oldHostSet; |
| } else if (addr.toString().equals("Neg")) { |
| newHostSet = ImmutableSet.of(); |
| } else { |
| logger.error("Invalid Addr type: {}", addr.getClass().getName()); |
| throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName()); |
| } |
| |
| // Reference comparison is valid as the sets are immutable |
| if (oldHostSet != newHostSet) { |
| logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet)); |
| resolutionPending.set(false); |
| hostSet = newHostSet; |
| synchronized (watchers) { |
| for (HostChangeMonitor<ServiceInstance> watcher: watchers) { |
| watcher.onChange(newHostSet); |
| } |
| } |
| |
| } |
| |
| return BoxedUnit.UNIT; |
| } |
| |
| |
| private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) { |
| StringBuilder result = new StringBuilder(); |
| result.append("("); |
| for (ServiceInstance serviceInstance : hostSet) { |
| Endpoint endpoint = serviceInstance.getServiceEndpoint(); |
| result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort())); |
| } |
| result.append(" )"); |
| |
| return result.toString(); |
| } |
| |
| |
| /** |
| * Attempts to join a server set for this logical service group. |
| * |
| * @param endpoint the primary service endpoint |
| * @param additionalEndpoints and additional endpoints keyed by their logical name |
| * @param status the current service status |
| * @return an EndpointStatus object that allows the endpoint to adjust its status |
| * @throws Group.JoinException if there was a problem joining the server set |
| * @throws InterruptedException if interrupted while waiting to join the server set |
| * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)} |
| */ |
| @Override |
| public EndpointStatus join(InetSocketAddress endpoint, |
| Map<String, InetSocketAddress> additionalEndpoints, |
| Status status) |
| throws Group.JoinException, InterruptedException { |
| throw new UnsupportedOperationException("NameServerSet does not support join"); |
| } |
| |
| /** |
| * Attempts to join a server set for this logical service group. |
| * |
| * @param endpoint the primary service endpoint |
| * @param additionalEndpoints and additional endpoints keyed by their logical name |
| * @return an EndpointStatus object that allows the endpoint to adjust its status |
| * @throws Group.JoinException if there was a problem joining the server set |
| * @throws InterruptedException if interrupted while waiting to join the server set |
| */ |
| @Override |
| public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints) |
| throws Group.JoinException, InterruptedException { |
| throw new UnsupportedOperationException("NameServerSet does not support join"); |
| } |
| |
| /** |
| * Attempts to join a server set for this logical service group. |
| * |
| * @param endpoint the primary service endpoint |
| * @param additionalEndpoints and additional endpoints keyed by their logical name |
| * @param shardId Unique shard identifier for this member of the service. |
| * @return an EndpointStatus object that allows the endpoint to adjust its status |
| * @throws Group.JoinException if there was a problem joining the server set |
| * @throws InterruptedException if interrupted while waiting to join the server set |
| */ |
| @Override |
| public EndpointStatus join(InetSocketAddress endpoint, |
| Map<String, InetSocketAddress> additionalEndpoints, |
| int shardId) |
| throws Group.JoinException, InterruptedException { |
| throw new UnsupportedOperationException("NameServerSet does not support join"); |
| } |
| |
| /** |
| * Registers a monitor to receive change notices for this server set as long as this jvm process |
| * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. |
| * The monitor will be notified if the membership set or parameters of existing members have |
| * changed. |
| * |
| * @param monitor the server set monitor to call back when the host set changes |
| * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set |
| * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)} |
| */ |
| @Deprecated |
| @Override |
| public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { |
| throw new UnsupportedOperationException("NameServerSet does not support monitor"); |
| } |
| |
| /** |
| * Registers a monitor to receive change notices for this server set as long as this jvm process |
| * is alive. Blocks until the initial server set can be gathered and delivered to the monitor. |
| * The monitor will be notified if the membership set or parameters of existing members have |
| * changed. |
| * |
| * @param monitor the server set monitor to call back when the host set changes |
| * @return A command which, when executed, will stop monitoring the host set. |
| * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set |
| */ |
| @Override |
| public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException { |
| // First add the monitor to the watchers so that it does not miss any changes and invoke |
| // the onChange method |
| synchronized (watchers) { |
| watchers.add(monitor); |
| } |
| |
| if (resolutionPending.compareAndSet(false, false)) { |
| monitor.onChange(hostSet); |
| } |
| |
| return Commands.NOOP; // Return value is not used |
| } |
| } |