blob: 5b77723a2ff95ea1949efcd5a0caf28f00885c45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.stratos.cep.extension;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.MessagingUtil;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
import org.wso2.siddhi.core.persistence.ThreadBarrier;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.expression.Expression;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.api.expression.constant.IntConstant;
import org.wso2.siddhi.query.api.expression.constant.LongConstant;
import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* CEP window processor to handle faulty member instances. This window processor is responsible for
* publishing MemberFault event if health stats are not received within a given time window.
*/
@SiddhiExtension(namespace = "stratos", function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
private static final String ACTIVATE_TIMEOUT_KEY = "cep.fault.handler.extension.activate.timeout";
private static final int ACTIVATE_TIMEOUT =
Integer.getInteger(ACTIVATE_TIMEOUT_KEY, 60 * 1000 * 15);
private static final int TIME_OUT = 60 * 1000;
private ScheduledExecutorService faultHandleScheduler;
private ScheduledFuture<?> lastSchedule;
private ThreadBarrier threadBarrier;
private long timeToKeep;
private ISchedulerSiddhiQueue<StreamEvent> window;
private EventPublisher healthStatPublisher = EventPublisherPool
.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
// Map of member id's to their last received health event time stamp
private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
private volatile boolean isActive;
private volatile boolean hasMemberTimeStampMapInitialized;
private long startTime = System.currentTimeMillis();
// Event receiver to receive topology events published by cloud-controller
private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
// Stratos member id attribute index in stream execution plan
private int memberIdAttrIndex;
@Override
protected void processEvent(InEvent event) {
addDataToMap(event);
}
@Override
protected void processEvent(InListEvent listEvent) {
for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
addDataToMap((InEvent) listEvent.getEvent(i));
}
}
/**
* Add new entry to time stamp map from the received event.
*
* @param event Event received by Siddhi.
*/
private void addDataToMap(InEvent event) {
if (!isActive) {
log.info("Received first event. Marking fault handling window processor as active");
isActive = true;
}
String id = (String) event.getData()[memberIdAttrIndex];
//checking whether this member is the topology.
//sometimes there can be a delay between publishing member terminated events
//and actually terminating instances. Hence CEP might get events for already terminated members
//so we are checking the topology for the member existence
Member member = getMemberFromId(id);
if (null == member) {
log.warn(String.format("Member with [id] %s not found in the topology. Event rejected", id));
return;
}
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
log.warn("NULL member id found in the event received. Event rejected.");
}
if (log.isDebugEnabled()) {
log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
}
}
@Override
public Iterator<StreamEvent> iterator() {
return window.iterator();
}
@Override
public Iterator<StreamEvent> iterator(String predicate) {
if (siddhiContext.isDistributedProcessingEnabled()) {
return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
} else {
return window.iterator();
}
}
/**
* Retrieve the current activated members from the topology and initialize the timestamp map.
* This will allow the system to recover from a restart
*
* @param topology Topology model object
*/
boolean loadTimeStampMapFromTopology(Topology topology) {
long currentTimeStamp = System.currentTimeMillis();
if (topology == null || topology.getServices() == null) {
return false;
}
// TODO make this efficient by adding APIs to messaging component
for (Service service : topology.getServices()) {
if (service.getClusters() != null) {
for (Cluster cluster : service.getClusters()) {
if (cluster.getMembers() != null) {
for (Member member : cluster.getMembers()) {
// we are checking faulty status only in previously activated members
if (member != null && MemberStatus.Active.equals(member.getStatus())) {
// Initialize the member time stamp map from the topology at the beginning
memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
}
}
}
}
}
}
hasMemberTimeStampMapInitialized = true;
if (log.isInfoEnabled()) {
log.info("Member timestamps were successfully loaded from the topology: [timestamps] " +
Arrays.toString(memberTimeStampMap.entrySet().toArray()));
}
return true;
}
private Member getMemberFromId(String memberId) {
if (StringUtils.isEmpty(memberId)) {
return null;
}
if (TopologyManager.isInitialized()) {
try {
TopologyManager.acquireReadLock();
if (TopologyManager.getTopology().getServices() == null) {
return null;
}
// TODO make this efficient by adding APIs to messaging component
for (Service service : TopologyManager.getTopology().getServices()) {
if (service.getClusters() != null) {
for (Cluster cluster : service.getClusters()) {
if (cluster.getMembers() != null) {
for (Member member : cluster.getMembers()) {
if (memberId.equals(member.getMemberId())) {
return member;
}
}
}
}
}
}
} catch (Exception e) {
log.error("Error while reading topology" + e);
} finally {
TopologyManager.releaseReadLock();
}
}
return null;
}
private void publishMemberFault(Member member) {
if (member == null) {
log.warn("Failed to publish member fault event. Member object is null");
return;
}
log.info("Publishing member fault event for [member-id] " + member.getMemberId());
MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0);
memberFaultEventMessageMap.put("message", memberFaultEvent);
healthStatPublisher.publish(MemberFaultEventMap, true);
}
@Override
public void run() {
try {
// wait until the first event OR given timeout to expire in order to activate this window processor
// this is to prevent false positives at the CEP startup
if (!isActive && System.currentTimeMillis() - startTime > ACTIVATE_TIMEOUT) {
log.info("Activation wait timeout has expired. Marking fault handling window processor as active");
isActive = true;
}
// do not process events until memberTimeStampMap is initialized and window processor is activated
// memberTimeStampMap will be initialized only after receiving the complete topology event
if (!(isActive && hasMemberTimeStampMapInitialized)) {
return;
}
threadBarrier.pass();
for (Object o : memberTimeStampMap.entrySet()) {
Map.Entry pair = (Map.Entry) o;
long currentTime = System.currentTimeMillis();
Long eventTimeStamp = (Long) pair.getValue();
if ((currentTime - eventTimeStamp) > TIME_OUT) {
String memberId = (String) pair.getKey();
Member member = getMemberFromId(memberId);
if (member != null) {
log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
publishMemberFault(member);
}
memberTimeStampMap.remove(memberId);
}
}
if (log.isDebugEnabled()) {
log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
} finally {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
}
}
@Override
protected Object[] currentState() {
return new Object[]{window.currentState()};
}
@Override
protected void restoreState(Object[] data) {
window.restoreState(data);
window.restoreState((Object[]) data[0]);
window.reSchedule();
}
@Override
protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext
siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
} else {
timeToKeep = ((LongConstant) parameters[0]).getValue();
}
String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
if (this.siddhiContext.isDistributedProcessingEnabled()) {
window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
} else {
window = new SchedulerSiddhiQueue<StreamEvent>(this);
}
MemberFaultEventMap
.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
//Ordinary scheduling
window.schedule();
if (log.isInfoEnabled()) {
log.info(String.format("Fault handling window processor initialized with [timeToKeep] %s, " +
"[memberIdAttrName] %s, [memberIdAttrIndex] %s, [distributed-enabled] %s, " +
"[activate-timeout] %d", timeToKeep, memberIdAttrName, memberIdAttrIndex,
siddhiContext.isDistributedProcessingEnabled(), ACTIVATE_TIMEOUT));
}
}
@Override
public void schedule() {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
}
@Override
public void scheduleNow() {
if (lastSchedule != null) {
lastSchedule.cancel(false);
}
lastSchedule = faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
}
@Override
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.faultHandleScheduler = scheduledExecutorService;
}
@Override
public void setThreadBarrier(ThreadBarrier threadBarrier) {
this.threadBarrier = threadBarrier;
}
@Override
public void destroy() {
// terminate topology listener thread
cepTopologyEventReceiver.destroy();
window = null;
}
ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
return memberTimeStampMap;
}
}