blob: 56c551db8d52ff26eb9d71d7d6515df27421c848 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.netty.example
import java.io.{BufferedReader, InputStreamReader}
import java.net._
import org.apache.commons.lang3.SystemUtils
import org.mortbay.util.MultiException
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
/**
* Netty Utility class for start netty service and retry tcp port
*/
object NettyUtil {
private lazy val logger = LoggerFactory.getLogger(getClass)
/** find local inet addresses */
def findLocalInetAddress(): InetAddress = {
val address = InetAddress.getLocalHost
address.isLoopbackAddress match {
case true =>
// Address resolves to something like 127.0.1.1, which happens on Debian; try to find
// a better address using the local network interfaces
// getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order
// on unix-like system. On windows, it returns in index order.
// It's more proper to pick ip address following system output order.
val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq
val reOrderedNetworkIFs = SystemUtils.IS_OS_WINDOWS match {
case true => activeNetworkIFs
case false => activeNetworkIFs.reverse
}
reOrderedNetworkIFs.find { ni: NetworkInterface =>
val addr = ni.getInetAddresses.asScala.toSeq.filterNot { addr =>
addr.isLinkLocalAddress || addr.isLoopbackAddress
}
addr.nonEmpty
} match {
case Some(ni) =>
val addr = ni.getInetAddresses.asScala.toSeq.filterNot { inet =>
inet.isLinkLocalAddress || inet.isLoopbackAddress
}
val address = addr.find(_.isInstanceOf[Inet4Address]).getOrElse(addr.head).getAddress
// because of Inet6Address.toHostName may add interface at the end if it knows about it
InetAddress.getByAddress(address)
case None => address
}
case false => address
}
}
/**
* start service, if port is collision, retry 128 times
* Tip: this function is copy from spark: org.apache.spark.util.Utils.scala#L2172
* Its better way to retry unused port
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => T,
maxRetries: Int = 128,
serviceName: String = ""): T = {
if (startPort != 0 && (startPort < 1024 || startPort > 65536)) {
throw new Exception("startPort should be between 1024 and 65535 (inclusive), " +
"or 0 for a random free port.")
}
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
for (offset <- 0 to maxRetries) {
// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
startPort
} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val result = startService(tryPort)
logger.info(s"Successfully started service$serviceString, result:$result.")
return result
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
"port or increasing spark.port.maxRetries."
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logger.error(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
}
}
// Should never happen
throw new Exception(s"Failed to start service$serviceString on port $startPort")
}
/** send GET request to this url */
def sendGetRequest(url: String): String = {
val obj: URL = new URL(url)
val con: HttpURLConnection = obj.openConnection.asInstanceOf[HttpURLConnection]
con.setRequestMethod("GET")
val code = try {
con.getResponseCode
} catch {
case e: Throwable => e
}
code match {
case HttpURLConnection.HTTP_OK =>
val in: BufferedReader = new BufferedReader(new InputStreamReader(con.getInputStream))
var inputLine: String = ""
val response: StringBuilder = new StringBuilder
try {
while (inputLine != null) {
response.append(inputLine)
inputLine = in.readLine()
}
in.close()
} catch {
case throwable: Exception =>
}
response.toString
case x => throw new Exception("GET request not worked of url: " + url)
}
}
/** Return whether the exception is caused by an address-port collision when binding. */
private def isBindCollision(exception: Throwable): Boolean = {
exception match {
case e: BindException if e.getMessage != null => true
case e: BindException => isBindCollision(e.getCause)
case e: MultiException =>
e.getThrowables.asScala.toList.map(_.asInstanceOf[Throwable]).exists(isBindCollision)
case e: Exception => isBindCollision(e.getCause)
case _ => false
}
}
}