blob: af3555124f395fc5218fb7e8c336da929aeee79f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.evaluator.impl;
import org.apache.eagle.alert.engine.AlertStreamCollector;
import org.apache.eagle.alert.engine.StreamContext;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.CompositePolicyHandler;
import org.apache.eagle.alert.engine.evaluator.PolicyGroupEvaluator;
import org.apache.eagle.alert.engine.evaluator.PolicyHandlerContext;
import org.apache.eagle.alert.engine.evaluator.PolicyStreamHandler;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PolicyGroupEvaluatorImpl implements PolicyGroupEvaluator {
private static final long serialVersionUID = -5499413193675985288L;
private static final Logger LOG = LoggerFactory.getLogger(PolicyGroupEvaluatorImpl.class);
private AlertStreamCollector collector;
// mapping from policy name to PolicyDefinition
private volatile Map<String, PolicyDefinition> policyDefinitionMap = new HashMap<>();
// mapping from policy name to PolicyStreamHandler
private volatile Map<String, CompositePolicyHandler> policyStreamHandlerMap = new HashMap<>();
private String policyEvaluatorId;
private StreamContext context;
public PolicyGroupEvaluatorImpl(String policyEvaluatorId) {
this.policyEvaluatorId = policyEvaluatorId;
}
public void init(StreamContext context, AlertStreamCollector collector) {
this.collector = collector;
this.policyStreamHandlerMap = new HashMap<>();
this.context = context;
Thread.currentThread().setName(policyEvaluatorId);
}
public void nextEvent(PartitionedEvent event) {
this.context.counter().incr("receive_count");
dispatch(event);
}
@Override
public String getName() {
return this.policyEvaluatorId;
}
public void close() {
for (PolicyStreamHandler handler : policyStreamHandlerMap.values()) {
try {
handler.close();
} catch (Exception e) {
LOG.error("Failed to close handler {}", handler.toString(), e);
}
}
}
/**
* fixme make selection of PolicyStreamHandler to be more efficient.
*
* @param partitionedEvent PartitionedEvent
*/
private void dispatch(PartitionedEvent partitionedEvent) {
boolean handled = false;
for (Map.Entry<String, CompositePolicyHandler> policyStreamHandler : policyStreamHandlerMap.entrySet()) {
if (isAcceptedByPolicy(partitionedEvent, policyDefinitionMap.get(policyStreamHandler.getKey()))) {
try {
handled = true;
this.context.counter().incr("eval_count");
policyStreamHandler.getValue().send(partitionedEvent.getEvent());
} catch (Exception e) {
this.context.counter().incr("fail_count");
LOG.error("{} failed to handle {}", policyStreamHandler.getValue(), partitionedEvent.getEvent(), e);
}
}
}
if (!handled) {
this.context.counter().incr("drop_count");
LOG.warn("Drop stream non-matched event {}, which should not be sent to evaluator", partitionedEvent);
} else {
this.context.counter().incr("accept_count");
}
}
private static boolean isAcceptedByPolicy(PartitionedEvent event, PolicyDefinition policy) {
return policy.getPartitionSpec().contains(event.getPartition())
&& (policy.getInputStreams().contains(event.getEvent().getStreamId())
|| policy.getDefinition().getInputStreams().contains(event.getEvent().getStreamId()));
}
@Override
public void onPolicyChange(String version, List<PolicyDefinition> added, List<PolicyDefinition> removed, List<PolicyDefinition> modified, Map<String, StreamDefinition> sds) {
Map<String, PolicyDefinition> copyPolicies = new HashMap<>(policyDefinitionMap);
Map<String, CompositePolicyHandler> copyHandlers = new HashMap<>(policyStreamHandlerMap);
for (PolicyDefinition pd : added) {
inplaceAdd(copyPolicies, copyHandlers, pd, sds);
}
for (PolicyDefinition pd : removed) {
inplaceRemove(copyPolicies, copyHandlers, pd);
}
for (PolicyDefinition pd : modified) {
inplaceRemove(copyPolicies, copyHandlers, pd);
inplaceAdd(copyPolicies, copyHandlers, pd, sds);
}
// logging
LOG.info("{} with {} Policy metadata updated with added={}, removed={}, modified={}", policyEvaluatorId, version, added, removed, modified);
// switch reference
this.policyDefinitionMap = copyPolicies;
this.policyStreamHandlerMap = copyHandlers;
}
private void inplaceAdd(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy, Map<String, StreamDefinition> sds) {
if (handlers.containsKey(policy.getName())) {
LOG.error("metadata calculation error, try to add existing PolicyDefinition " + policy);
} else {
policies.put(policy.getName(), policy);
CompositePolicyHandler handler = new CompositePolicyHandler(sds);
try {
PolicyHandlerContext handlerContext = new PolicyHandlerContext();
handlerContext.setPolicyCounter(this.context.counter());
handlerContext.setPolicyDefinition(policy);
handlerContext.setPolicyEvaluator(this);
handlerContext.setPolicyEvaluatorId(policyEvaluatorId);
handlerContext.setConfig(this.context.config());
handler.prepare(collector, handlerContext);
handlers.put(policy.getName(), handler);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
policies.remove(policy.getName());
handlers.remove(policy.getName());
}
}
}
private void inplaceRemove(Map<String, PolicyDefinition> policies, Map<String, CompositePolicyHandler> handlers, PolicyDefinition policy) {
if (handlers.containsKey(policy.getName())) {
PolicyStreamHandler handler = handlers.get(policy.getName());
try {
handler.close();
} catch (Exception e) {
LOG.error("Failed to close handler {}", handler, e);
} finally {
policies.remove(policy.getName());
handlers.remove(policy.getName());
LOG.info("Removed policy: {}", policy);
}
} else {
LOG.error("metadata calculation error, try to remove nonexisting PolicyDefinition: " + policy);
}
}
public CompositePolicyHandler getPolicyHandler(String policy) {
return policyStreamHandlerMap.get(policy);
}
}