blob: 0b13500829c2f5c31d1cfbe699716bcff91f4edb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.stratos.autoscaler.event.receiver.health;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.health.stat.*;
import org.apache.stratos.messaging.listener.health.stat.*;
import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import java.util.concurrent.ExecutorService;
/**
* A thread for processing topology messages and updating the topology data structure.
*/
public class AutoscalerHealthStatEventReceiver {
private static final Log log = LogFactory.getLog(AutoscalerHealthStatEventReceiver.class);
private boolean terminated = false;
private HealthStatEventReceiver healthStatEventReceiver;
private ExecutorService executorService;
public AutoscalerHealthStatEventReceiver() {
this.healthStatEventReceiver = HealthStatEventReceiver.getInstance();
addEventListeners();
}
// public void execute() {
// healthStatEventReceiver.setExecutorService(executorService);
// healthStatEventReceiver.execute();
//
// if (log.isInfoEnabled()) {
// log.info("Autoscaler health stat event receiver thread started");
// }
// }
private void addEventListeners() {
// Listen to health stat events that affect clusters
healthStatEventReceiver.addEventListener(new AverageLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
AverageLoadAverageEvent averageLoadAverageEvent = (AverageLoadAverageEvent) event;
String clusterId = averageLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleAverageLoadAverageEvent(averageLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new AverageMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
AverageMemoryConsumptionEvent averageMemoryConsumptionEvent = (AverageMemoryConsumptionEvent) event;
String clusterId = averageMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleAverageMemoryConsumptionEvent(averageMemoryConsumptionEvent);
}
});
healthStatEventReceiver.addEventListener(new AverageRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
AverageRequestsInFlightEvent averageRequestsInFlightEvent = (AverageRequestsInFlightEvent) event;
String clusterId = averageRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleAverageRequestsInFlightEvent(averageRequestsInFlightEvent);
}
});
healthStatEventReceiver.addEventListener(new AverageRequestsServingCapabilityEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
AverageRequestsServingCapabilityEvent averageRequestsServingCapabilityEvent = (AverageRequestsServingCapabilityEvent) event;
String clusterId = averageRequestsServingCapabilityEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
if (monitor instanceof ClusterMonitor) {
ClusterMonitor vmClusterMonitor = (ClusterMonitor) monitor;
vmClusterMonitor.handleAverageRequestsServingCapabilityEvent(averageRequestsServingCapabilityEvent);
}
}
});
healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
GradientOfLoadAverageEvent gradientOfLoadAverageEvent = (GradientOfLoadAverageEvent) event;
String clusterId = gradientOfLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleGradientOfLoadAverageEvent(gradientOfLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new GradientOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
GradientOfMemoryConsumptionEvent gradientOfMemoryConsumptionEvent = (GradientOfMemoryConsumptionEvent) event;
String clusterId = gradientOfMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleGradientOfMemoryConsumptionEvent(gradientOfMemoryConsumptionEvent);
}
});
healthStatEventReceiver.addEventListener(new GradientOfRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
GradientOfRequestsInFlightEvent gradientOfRequestsInFlightEvent = (GradientOfRequestsInFlightEvent) event;
String clusterId = gradientOfRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleGradientOfRequestsInFlightEvent(gradientOfRequestsInFlightEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberAverageLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberAverageLoadAverageEvent memberAverageLoadAverageEvent = (MemberAverageLoadAverageEvent) event;
String memberId = memberAverageLoadAverageEvent.getMemberId();
Member member = getMemberByMemberId(memberId);
if (null == member) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
return;
}
if (!member.isActive()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member activated event has not received for the member %s. "
+ "Therefore ignoring" + " the health stat", memberId));
}
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
String clusterId = member.getClusterId();
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberAverageLoadAverageEvent(memberAverageLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberAverageMemoryConsumptionEvent memberAverageMemoryConsumptionEvent = (MemberAverageMemoryConsumptionEvent) event;
String memberId = memberAverageMemoryConsumptionEvent.getMemberId();
Member member = getMemberByMemberId(memberId);
if (null == member) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
return;
}
if (!member.isActive()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member activated event has not received for the member %s. "
+ "Therefore ignoring" + " the health stat", memberId));
}
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
String clusterId = member.getClusterId();
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberAverageMemoryConsumptionEvent(memberAverageMemoryConsumptionEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberFaultEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberFaultEvent memberFaultEvent = (MemberFaultEvent) event;
String clusterId = memberFaultEvent.getClusterId();
String memberId = memberFaultEvent.getMemberId();
if (log.isDebugEnabled()) {
log.debug(String.format("Member fault event received: [member] %s ", memberId));
}
if (memberId == null || memberId.isEmpty()) {
log.error("Member id not found in received message");
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberFaultEvent(memberFaultEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberGradientOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberGradientOfLoadAverageEvent memberGradientOfLoadAverageEvent = (MemberGradientOfLoadAverageEvent) event;
String memberId = memberGradientOfLoadAverageEvent.getMemberId();
Member member = getMemberByMemberId(memberId);
if (null == member) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
return;
}
if (!member.isActive()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member activated event has not received for the member %s. "
+ "Therefore ignoring" + " the health stat", memberId));
}
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
String clusterId = member.getClusterId();
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberGradientOfLoadAverageEvent(memberGradientOfLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberGradientOfMemoryConsumptionEvent memberGradientOfMemoryConsumptionEvent
= (MemberGradientOfMemoryConsumptionEvent) event;
String memberId = memberGradientOfMemoryConsumptionEvent.getMemberId();
Member member = getMemberByMemberId(memberId);
if (null == member) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
return;
}
if (!member.isActive()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member activated event has not received for the member %s. "
+ "Therefore ignoring" + " the health stat", memberId));
}
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
String clusterId = member.getClusterId();
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberGradientOfMemoryConsumptionEvent(memberGradientOfMemoryConsumptionEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
MemberSecondDerivativeOfLoadAverageEvent memberSecondDerivativeOfLoadAverageEvent = (MemberSecondDerivativeOfLoadAverageEvent) event;
String memberId = memberSecondDerivativeOfLoadAverageEvent.getMemberId();
Member member = getMemberByMemberId(memberId);
if (null == member) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member not found in the Topology: [member] %s", memberId));
}
return;
}
if (!member.isActive()) {
if (log.isDebugEnabled()) {
log.debug(String.format("Member activated event has not received for the member %s. "
+ "Therefore ignoring" + " the health stat", memberId));
}
return;
}
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
String clusterId = member.getClusterId();
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleMemberSecondDerivativeOfLoadAverageEvent(memberSecondDerivativeOfLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
}
});
healthStatEventReceiver.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
SecondDerivativeOfLoadAverageEvent secondDerivativeOfLoadAverageEvent = (SecondDerivativeOfLoadAverageEvent) event;
String clusterId = secondDerivativeOfLoadAverageEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleSecondDerivativeOfLoadAverageEvent(secondDerivativeOfLoadAverageEvent);
}
});
healthStatEventReceiver.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
SecondDerivativeOfMemoryConsumptionEvent secondDerivativeOfMemoryConsumptionEvent = (SecondDerivativeOfMemoryConsumptionEvent) event;
String clusterId = secondDerivativeOfMemoryConsumptionEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleSecondDerivativeOfMemoryConsumptionEvent(secondDerivativeOfMemoryConsumptionEvent);
}
});
healthStatEventReceiver.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
@Override
protected void onEvent(org.apache.stratos.messaging.event.Event event) {
SecondDerivativeOfRequestsInFlightEvent secondDerivativeOfRequestsInFlightEvent
= (SecondDerivativeOfRequestsInFlightEvent) event;
String clusterId = secondDerivativeOfRequestsInFlightEvent.getClusterId();
AutoscalerContext asCtx = AutoscalerContext.getInstance();
ClusterMonitor monitor;
monitor = asCtx.getClusterMonitor(clusterId);
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ "[cluster] %s", clusterId));
}
return;
}
monitor.handleSecondDerivativeOfRequestsInFlightEvent(secondDerivativeOfRequestsInFlightEvent);
}
});
}
private Member getMemberByMemberId(String memberId) {
try {
TopologyManager.acquireReadLock();
for (Service service : TopologyManager.getTopology().getServices()) {
for (Cluster cluster : service.getClusters()) {
if (cluster.memberExists(memberId)) {
return cluster.getMember(memberId);
}
}
}
return null;
} finally {
TopologyManager.releaseReadLock();
}
}
public void terminate() {
this.terminated = true;
}
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
}