blob: d4fe01abc5d3dadee24300c52a02e2d3b527587a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.engine.statecheck;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
import org.apache.eagle.alert.engine.coordinator.PublishPartition;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
import org.apache.eagle.alert.engine.evaluator.impl.PolicyGroupEvaluatorImpl;
import org.apache.eagle.alert.engine.model.AlertStreamEvent;
import org.apache.eagle.alert.engine.model.PartitionedEvent;
import org.apache.eagle.alert.engine.model.StreamEvent;
import org.apache.eagle.alert.engine.router.TestAlertBolt;
import org.apache.eagle.alert.engine.runner.AlertBolt;
import org.apache.eagle.alert.utils.AlertConstants;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import org.jetbrains.annotations.NotNull;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* Created on 8/4/16.
*/
public class TestStateCheckPolicy {
@Test
public void testStateCheck() throws Exception {
PolicyGroupEvaluatorImpl impl = new PolicyGroupEvaluatorImpl("test-statecheck-poicyevaluator");
AtomicBoolean verified = new AtomicBoolean(false);
OutputCollector collector = new OutputCollector(new IOutputCollector() {
@Override
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
verified.set(true);
Assert.assertEquals("perfmon_latency_check_output2", ((PublishPartition) tuple.get(0)).getStreamId());
AlertStreamEvent event = (AlertStreamEvent) tuple.get(1);
System.out.println(String.format("collector received: [streamId=[%s], tuple=[%s] ", ((PublishPartition) tuple.get(0)).getStreamId(), tuple));
return null;
}
@Override
public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
}
@Override
public void ack(Tuple input) {
}
@Override
public void fail(Tuple input) {
}
@Override
public void reportError(Throwable error) {
}
});
AlertBolt alertBolt = TestAlertBolt.createAlertBolt(collector);
AlertBoltSpec spec = createAlertSpec();
Map<String, StreamDefinition> definitionMap = createStreamMap();
List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
new TypeReference<List<PolicyDefinition>>() {
});
List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
new TypeReference<List<StreamDefinition>>() {
});
spec.addPublishPartition("perfmon_latency_check_output2", policies.get(0).getName(), "testPublishBolt", null);
alertBolt.onAlertBoltSpecChange(spec, definitionMap);
// send data now
sendData(alertBolt, definitionMap, spec.getBoltPoliciesMap().values().iterator().next().get(0));
Thread.sleep(3000);
Assert.assertTrue(verified.get());
}
private void sendData(AlertBolt alertBolt, Map<String, StreamDefinition> definitionMap, PolicyDefinition policyDefinition) {
StreamDefinition definition = definitionMap.get("perfmon_latency_stream");
long base = System.currentTimeMillis();
for (int i = 0; i < 2; i++) {
long time = base + i * 1000;
Map<String, Object> mapdata = new HashMap<>();
mapdata.put("host", "host-1");
mapdata.put("timestamp", time);
mapdata.put("metric", "perfmon_latency");
mapdata.put("pool", "raptor");
mapdata.put("value", 1000.0 + i * 1000.0);
mapdata.put("colo", "phx");
StreamEvent event = StreamEvent.builder().timestamep(time).attributes(mapdata, definition).build();
PartitionedEvent pEvent = new PartitionedEvent(event, policyDefinition.getPartitionSpec().get(0), 1);
GeneralTopologyContext mock = Mockito.mock(GeneralTopologyContext.class);
Mockito.when(mock.getComponentId(1)).thenReturn("taskId");
Mockito.when(mock.getComponentOutputFields("taskId", "test-stream-id")).thenReturn(new Fields(AlertConstants.FIELD_0));
TupleImpl ti = new TupleImpl(mock, Collections.singletonList(pEvent), 1, "test-stream-id");
alertBolt.execute(ti);
}
}
@NotNull
private Map<String, StreamDefinition> createStreamMap() throws Exception {
List<StreamDefinition> streams = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/streamdefinitions.json"),
new TypeReference<List<StreamDefinition>>() {
});
return streams.stream().collect(Collectors.toMap(StreamDefinition::getStreamId, item -> item));
}
private static ObjectMapper mapper = new ObjectMapper();
static {
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
private AlertBoltSpec createAlertSpec() throws Exception {
AlertBoltSpec spec = new AlertBoltSpec();
spec.setVersion("version1");
spec.setTopologyName("testTopology");
List<PolicyDefinition> policies = mapper.readValue(TestStateCheckPolicy.class.getResourceAsStream("/statecheck/policies.json"),
new TypeReference<List<PolicyDefinition>>() {
});
Assert.assertTrue(policies.size() > 0);
spec.addBoltPolicy("alertBolt1", policies.get(0).getName());
spec.getBoltPoliciesMap().put("alertBolt1", new ArrayList<>(policies));
return spec;
}
}