| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.protocol; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.socket.SocketChannel; |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.service.BrokerService; |
| |
| /** |
| * A collection of loaded handlers. |
| */ |
| @Slf4j |
| public class ProtocolHandlers implements AutoCloseable { |
| |
| /** |
| * Load the protocol handlers for the given <tt>protocol</tt> list. |
| * |
| * @param conf the pulsar broker service configuration |
| * @return the collection of protocol handlers |
| */ |
| public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException { |
| ProtocolHandlerDefinitions definitions = |
| ProtocolHandlerUtils.searchForHandlers( |
| conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory()); |
| |
| ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder(); |
| |
| conf.getMessagingProtocols().forEach(protocol -> { |
| |
| ProtocolHandlerMetadata definition = definitions.handlers().get(protocol); |
| if (null == definition) { |
| throw new RuntimeException("No protocol handler is found for protocol `" + protocol |
| + "`. Available protocols are : " + definitions.handlers()); |
| } |
| |
| ProtocolHandlerWithClassLoader handler; |
| try { |
| handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory()); |
| } catch (IOException e) { |
| log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e); |
| throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`"); |
| } |
| |
| if (!handler.accept(protocol)) { |
| handler.close(); |
| log.error("Malformed protocol handler found for protocol `" + protocol + "`"); |
| throw new RuntimeException("Malformed protocol handler found for protocol `" + protocol + "`"); |
| } |
| |
| handlersBuilder.put(protocol, handler); |
| log.info("Successfully loaded protocol handler for protocol `{}`", protocol); |
| }); |
| |
| return new ProtocolHandlers(handlersBuilder.build()); |
| } |
| |
| private final Map<String, ProtocolHandlerWithClassLoader> handlers; |
| |
| ProtocolHandlers(Map<String, ProtocolHandlerWithClassLoader> handlers) { |
| this.handlers = handlers; |
| } |
| |
| /** |
| * Return the handler for the provided <tt>protocol</tt>. |
| * |
| * @param protocol the protocol to use |
| * @return the protocol handler to handle the provided protocol |
| */ |
| public ProtocolHandler protocol(String protocol) { |
| ProtocolHandlerWithClassLoader h = handlers.get(protocol); |
| if (null == h) { |
| return null; |
| } else { |
| return h.getHandler(); |
| } |
| } |
| |
| public void initialize(ServiceConfiguration conf) throws Exception { |
| for (ProtocolHandler handler : handlers.values()) { |
| handler.initialize(conf); |
| } |
| } |
| |
| public Map<String, String> getProtocolDataToAdvertise() { |
| return handlers.entrySet().stream() |
| .collect(Collectors.toMap( |
| e -> e.getKey(), |
| e -> e.getValue().getProtocolDataToAdvertise() |
| )); |
| } |
| |
| public Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> newChannelInitializers() { |
| Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> channelInitializers = Maps.newHashMap(); |
| Set<InetSocketAddress> addresses = Sets.newHashSet(); |
| |
| for (Map.Entry<String, ProtocolHandlerWithClassLoader> handler : handlers.entrySet()) { |
| Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializers = |
| handler.getValue().newChannelInitializers(); |
| initializers.forEach((address, initializer) -> { |
| if (!addresses.add(address)) { |
| log.error("Protocol handler for `{}` attempts to use {} for its listening port." |
| + " But it is already occupied by other message protocols.", |
| handler.getKey(), address); |
| throw new RuntimeException("Protocol handler for `" + handler.getKey() |
| + "` attempts to use " + address + " for its listening port. But it is" |
| + " already occupied by other messaging protocols"); |
| } |
| channelInitializers.put(handler.getKey(), initializers); |
| }); |
| } |
| |
| return channelInitializers; |
| } |
| |
| public void start(BrokerService service) { |
| handlers.values().forEach(handler -> handler.start(service)); |
| } |
| |
| @Override |
| public void close() { |
| handlers.values().forEach(ProtocolHandler::close); |
| } |
| } |