| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.cluster |
| |
| import kafka.common.KafkaException |
| import org.apache.kafka.common.network.ListenerName |
| import org.apache.kafka.common.protocol.SecurityProtocol |
| import org.apache.kafka.common.utils.Utils |
| |
| import scala.collection.Map |
| |
| object EndPoint { |
| |
| private val uriParseExp = """^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r |
| |
| private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = |
| SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> sp).toMap |
| |
| /** |
| * Create EndPoint object from `connectionString` and optional `securityProtocolMap`. If the latter is not provided, |
| * we fallback to the default behaviour where listener names are the same as security protocols. |
| * |
| * @param connectionString the format is listener_name://host:port or listener_name://[ipv6 host]:port |
| * for example: PLAINTEXT://myhost:9092, CLIENT://myhost:9092 or REPLICATION://[::1]:9092 |
| * Host can be empty (PLAINTEXT://:9092) in which case we'll bind to default interface |
| * Negative ports are also accepted, since they are used in some unit tests |
| */ |
| def createEndPoint(connectionString: String, securityProtocolMap: Option[Map[ListenerName, SecurityProtocol]]): EndPoint = { |
| val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap) |
| |
| def securityProtocol(listenerName: ListenerName): SecurityProtocol = |
| protocolMap.getOrElse(listenerName, |
| throw new IllegalArgumentException(s"No security protocol defined for listener ${listenerName.value}")) |
| |
| connectionString match { |
| case uriParseExp(listenerNameString, "", port) => |
| val listenerName = ListenerName.normalised(listenerNameString) |
| new EndPoint(null, port.toInt, listenerName, securityProtocol(listenerName)) |
| case uriParseExp(listenerNameString, host, port) => |
| val listenerName = ListenerName.normalised(listenerNameString) |
| new EndPoint(host, port.toInt, listenerName, securityProtocol(listenerName)) |
| case _ => throw new KafkaException(s"Unable to parse $connectionString to a broker endpoint") |
| } |
| } |
| } |
| |
| /** |
| * Part of the broker definition - matching host/port pair to a protocol |
| */ |
| case class EndPoint(host: String, port: Int, listenerName: ListenerName, securityProtocol: SecurityProtocol) { |
| def connectionString: String = { |
| val hostport = |
| if (host == null) |
| ":"+port |
| else |
| Utils.formatAddress(host, port) |
| listenerName.value + "://" + hostport |
| } |
| } |