blob: e41caf1164fd741319685a8c6d562a3c600c3efc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.protocol;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
/**
* A collection of loaded handlers.
*/
@Slf4j
public class ProtocolHandlers implements AutoCloseable {
/**
* Load the protocol handlers for the given <tt>protocol</tt> list.
*
* @param conf the pulsar broker service configuration
* @return the collection of protocol handlers
*/
public static ProtocolHandlers load(ServiceConfiguration conf) throws IOException {
ProtocolHandlerDefinitions definitions =
ProtocolHandlerUtils.searchForHandlers(
conf.getProtocolHandlerDirectory(), conf.getNarExtractionDirectory());
ImmutableMap.Builder<String, ProtocolHandlerWithClassLoader> handlersBuilder = ImmutableMap.builder();
conf.getMessagingProtocols().forEach(protocol -> {
ProtocolHandlerMetadata definition = definitions.handlers().get(protocol);
if (null == definition) {
throw new RuntimeException("No protocol handler is found for protocol `" + protocol
+ "`. Available protocols are : " + definitions.handlers());
}
ProtocolHandlerWithClassLoader handler;
try {
handler = ProtocolHandlerUtils.load(definition, conf.getNarExtractionDirectory());
} catch (IOException e) {
log.error("Failed to load the protocol handler for protocol `" + protocol + "`", e);
throw new RuntimeException("Failed to load the protocol handler for protocol `" + protocol + "`");
}
if (!handler.accept(protocol)) {
handler.close();
log.error("Malformed protocol handler found for protocol `" + protocol + "`");
throw new RuntimeException("Malformed protocol handler found for protocol `" + protocol + "`");
}
handlersBuilder.put(protocol, handler);
log.info("Successfully loaded protocol handler for protocol `{}`", protocol);
});
return new ProtocolHandlers(handlersBuilder.build());
}
private final Map<String, ProtocolHandlerWithClassLoader> handlers;
ProtocolHandlers(Map<String, ProtocolHandlerWithClassLoader> handlers) {
this.handlers = handlers;
}
/**
* Return the handler for the provided <tt>protocol</tt>.
*
* @param protocol the protocol to use
* @return the protocol handler to handle the provided protocol
*/
public ProtocolHandler protocol(String protocol) {
ProtocolHandlerWithClassLoader h = handlers.get(protocol);
if (null == h) {
return null;
} else {
return h.getHandler();
}
}
public void initialize(ServiceConfiguration conf) throws Exception {
for (ProtocolHandler handler : handlers.values()) {
handler.initialize(conf);
}
}
public Map<String, String> getProtocolDataToAdvertise() {
return handlers.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().getProtocolDataToAdvertise()
));
}
public Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> newChannelInitializers() {
Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> channelInitializers = Maps.newHashMap();
Set<InetSocketAddress> addresses = Sets.newHashSet();
for (Map.Entry<String, ProtocolHandlerWithClassLoader> handler : handlers.entrySet()) {
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializers =
handler.getValue().newChannelInitializers();
initializers.forEach((address, initializer) -> {
if (!addresses.add(address)) {
log.error("Protocol handler for `{}` attempts to use {} for its listening port."
+ " But it is already occupied by other message protocols.",
handler.getKey(), address);
throw new RuntimeException("Protocol handler for `" + handler.getKey()
+ "` attempts to use " + address + " for its listening port. But it is"
+ " already occupied by other messaging protocols");
}
channelInitializers.put(handler.getKey(), initializers);
});
}
return channelInitializers;
}
public void start(BrokerService service) {
handlers.values().forEach(handler -> handler.start(service));
}
@Override
public void close() {
handlers.values().forEach(ProtocolHandler::close);
}
}