blob: 7a3d43074e0667fc10f149b1b0fb93523f49a94e [file] [log] [blame]
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Created by ${USER} on ${DATE}.
*/
package com.webank.wedatasphere.linkis.httpclient.discovery
import java.io.Closeable
import java.net.ConnectException
import java.util
import java.util.concurrent.ScheduledFuture
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.httpclient.Client
import com.webank.wedatasphere.linkis.httpclient.exception.DiscoveryException
import org.apache.http.HttpResponse
import scala.collection.JavaConversions._
import scala.concurrent.duration.TimeUnit
/**
* Created by enjoyyin on 2019/5/21.
*/
abstract class AbstractDiscovery extends Discovery with Closeable with Logging {
private var serverUrl: String = _
private var period: Long = _
private var timeUnit: TimeUnit = _
private var client: Client = _
private val discoveryListeners = new util.ArrayList[DiscoveryListener]
private val serverInstances = new util.HashSet[String]
private val unhealthyServerInstances = new util.HashSet[String]
private var discoveryFuture: ScheduledFuture[_] = _
private var heartbeatFuture: ScheduledFuture[_] = _
private var unhealthyHeartbeatFuture: ScheduledFuture[_] = _
override def setServerUrl(serverUrl: String): Unit = {
this.serverUrl = serverUrl
}
def getServerUrl: String = serverUrl
def setSchedule(period: Long, timeUnit: TimeUnit): Unit = {
this.period = period
this.timeUnit = timeUnit
}
def setClient(client: Client): Unit = this.client = client
def getClient: Client = client
override def start(): Unit = {
client.execute(getHeartbeatAction(serverUrl), 3000) match {
case heartbeat: HeartbeatResult => if(!heartbeat.isHealthy)
throw new DiscoveryException(s"connect to serverUrl $serverUrl failed! Reason: gateway server is unhealthy!")
else discoveryListeners.foreach(_.onServerDiscovered(serverUrl))
}
val delayTime = timeUnit.convert(timeUnit.toMillis(period)/5, timeUnit)
discoveryFuture = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryAndWarn {
val serverUrls = discovery()
serverInstances synchronized serverUrls.foreach { serverUrl =>
if(!serverInstances.contains(serverUrl)) {
serverInstances.add(serverUrl)
discoveryListeners.foreach(_.onServerDiscovered(serverUrl))
}
}
}
}, delayTime, period, timeUnit)
heartbeatFuture = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
serverInstances.toList.foreach { serverUrl =>
val action = getHeartbeatAction(serverUrl)
info("heartbeat to healthy gateway " + serverUrl)
Utils.tryCatch(client.execute(action, 3000) match {
case heartbeat: HeartbeatResult =>
if(!heartbeat.isHealthy) {
unhealthyServerInstances synchronized unhealthyServerInstances.add(serverUrl)
discoveryListeners.foreach(_.onServerUnhealthy(serverUrl))
serverInstances synchronized serverInstances.remove(serverUrl)
}
}) {
case _: ConnectException =>
unhealthyServerInstances synchronized unhealthyServerInstances.remove(serverUrl)
serverInstances synchronized serverInstances.remove(serverUrl)
discoveryListeners.foreach(_.onServerUnconnected(serverUrl))
}
}
}
}, delayTime, period, timeUnit)
unhealthyHeartbeatFuture = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
unhealthyServerInstances.toList.foreach { serverUrl =>
val action = getHeartbeatAction(serverUrl)
info("heartbeat to unhealthy gateway " + serverUrl)
Utils.tryCatch(client.execute(action, 3000) match {
case heartbeat: HeartbeatResult =>
if(heartbeat.isHealthy) {
unhealthyServerInstances synchronized unhealthyServerInstances.remove(serverUrl)
discoveryListeners.foreach(_.onServerHealthy(serverUrl))
serverInstances synchronized serverInstances.add(serverUrl)
} else if(serverInstances.contains(serverUrl)) serverInstances synchronized serverInstances.remove(serverUrl)
}) {
case _: ConnectException =>
unhealthyServerInstances synchronized unhealthyServerInstances.remove(serverUrl)
serverInstances synchronized serverInstances.remove(serverUrl)
discoveryListeners.foreach(_.onServerUnconnected(serverUrl))
}
}
}
}, delayTime, period, timeUnit)
}
protected def discovery(): Array[String]
protected def getHeartbeatAction(serverUrl: String): HeartbeatAction
def getHeartbeatResult(response: HttpResponse, requestAction: HeartbeatAction): HeartbeatResult
override def addDiscoveryListener(discoveryListener: DiscoveryListener): Unit =
discoveryListeners.add(discoveryListener)
override def getServerInstances: Array[String] = serverInstances.toArray(new Array[String](serverInstances.size()))
override def close(): Unit = {
discoveryFuture.cancel(true)
heartbeatFuture.cancel(true)
unhealthyHeartbeatFuture.cancel(true)
}
}