| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.eagle.alert.engine.siddhi; |
| |
| import org.junit.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import io.siddhi.core.SiddhiAppRuntime; |
| import io.siddhi.core.SiddhiManager; |
| import io.siddhi.core.event.Event; |
| import io.siddhi.core.stream.input.InputHandler; |
| import io.siddhi.core.stream.output.StreamCallback; |
| import io.siddhi.core.util.EventPrinter; |
| |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * @since Jun 21, 2016 |
| */ |
| public class SiddhiPolicyTest { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class); |
| |
| private String streams = " define stream syslog_stream(" |
| + "dims_facility string, " |
| + "dims_severity string, " |
| + "dims_hostname string, " |
| + "dims_msgid string, " |
| + "timestamp string, " |
| + "conn string, " |
| + "op string, " |
| + "msgId string, " |
| + "command string, " |
| + "name string, " |
| + "namespace string, " |
| + "epochMillis long); "; |
| private SiddhiManager sm; |
| |
| @Before |
| public void setup() { |
| sm = new SiddhiManager(); |
| } |
| |
| @After |
| public void shutdown() { |
| sm.shutdown(); |
| } |
| |
| @Test |
| public void testPolicy_grpby() { |
| String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count() as abortCount group by dims_hostname insert into syslog_severity_check_output; "; |
| StreamCallback sc = new StreamCallback() { |
| @Override |
| public void receive(Event[] arg0) { |
| |
| } |
| |
| ; |
| }; |
| |
| String executionPlan = streams + ql; |
| SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan); |
| runtime.addCallback("syslog_severity_check_output", sc); |
| runtime.start(); |
| } |
| |
| @Ignore |
| @Test |
| public void testPolicy_agg() throws Exception { |
| String sql = " from syslog_stream#window.time(1min) select " |
| + "name, " |
| + "namespace, " |
| + "timestamp, " |
| + "dims_hostname, " |
| + "count() as abortCount " |
| + "group by dims_hostname " |
| + "having abortCount > 3 insert into syslog_severity_check_output; "; |
| |
| final AtomicBoolean checked = new AtomicBoolean(false); |
| StreamCallback sc = new StreamCallback() { |
| @Override |
| public void receive(Event[] arg0) { |
| checked.set(true); |
| LOG.info("event array size: " + arg0.length); |
| Set<String> hosts = new HashSet<String>(); |
| for (Event e : arg0) { |
| hosts.add((String) e.getData()[3]); |
| } |
| |
| LOG.info(" grouped hosts : " + hosts); |
| Assert.assertTrue(hosts.contains("HOSTNAME-" + 0)); |
| Assert.assertTrue(hosts.contains("HOSTNAME-" + 1)); |
| Assert.assertTrue(hosts.contains("HOSTNAME-" + 2)); |
| Assert.assertFalse(hosts.contains("HOSTNAME-" + 3)); |
| } |
| |
| ; |
| }; |
| |
| String executionPlan = streams + sql; |
| SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan); |
| runtime.addCallback("syslog_severity_check_output", sc); |
| runtime.start(); |
| InputHandler handler = runtime.getInputHandler("syslog_stream"); |
| |
| sendInput(handler); |
| |
| Thread.sleep(1000); |
| |
| Assert.assertTrue(checked.get()); |
| |
| runtime.shutdown(); |
| } |
| |
| /* |
| + "dims_facility string, " |
| + "dims_severity string, " |
| + "dims_hostname string, " |
| + "dims_msgid string, " |
| + "timestamp string, " |
| + "conn string, " |
| + "op string, " |
| + "msgId string, " |
| + "command string, " |
| + "name string, " |
| + "namespace string, " |
| + "epochMillis long) |
| */ |
| private void sendInput(InputHandler handler) throws Exception { |
| int length = 15; |
| Event[] events = new Event[length]; |
| for (int i = 0; i < length; i++) { |
| Event e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i % 4, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| |
| events[i] = e; |
| } |
| |
| handler.send(events); |
| |
| Thread.sleep(61 * 1000); |
| |
| Event e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| handler.send(e); |
| } |
| |
| @Ignore |
| @Test |
| public void testPolicy_regex() throws Exception { |
| String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count() as abortCount group by dims_hostname insert into syslog_severity_check_output; "; |
| |
| AtomicBoolean checked = new AtomicBoolean(); |
| StreamCallback sc = new StreamCallback() { |
| @Override |
| public void receive(Event[] arg0) { |
| checked.set(true); |
| } |
| |
| ; |
| }; |
| |
| String executionPlan = streams + sql; |
| SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan); |
| runtime.addCallback("syslog_severity_check_output", sc); |
| runtime.start(); |
| |
| InputHandler handler = runtime.getInputHandler("syslog_stream"); |
| |
| sendInput(handler); |
| |
| Thread.sleep(1000); |
| |
| Assert.assertTrue(checked.get()); |
| |
| runtime.shutdown(); |
| } |
| |
| @Ignore |
| @Test |
| public void testPolicy_seq() throws Exception { |
| String sql = "" |
| + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> " |
| + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min " |
| + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op " |
| + " insert into syslog_severity_check_output; "; |
| |
| AtomicBoolean checked = new AtomicBoolean(); |
| StreamCallback sc = new StreamCallback() { |
| @Override |
| public void receive(Event[] arg0) { |
| checked.set(true); |
| } |
| |
| ; |
| }; |
| |
| String executionPlan = streams + sql; |
| SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(executionPlan); |
| runtime.addCallback("syslog_severity_check_output", sc); |
| runtime.start(); |
| InputHandler handler = runtime.getInputHandler("syslog_stream"); |
| |
| sendPatternInput(handler); |
| |
| Thread.sleep(1000); |
| Assert.assertTrue(checked.get()); |
| |
| runtime.shutdown(); |
| } |
| |
| private void sendPatternInput(InputHandler handler) throws Exception { |
| // validate one |
| Event e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| |
| e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| |
| e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0, "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| |
| Thread.sleep(61 * 1000); |
| |
| e = new Event(12); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11, "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()}); |
| handler.send(e); |
| } |
| |
| |
| @Test |
| public void testStrConcat() throws Exception { |
| String ql = " define stream log(timestamp long, switchLabel string, port string, message string); " + |
| " from log select timestamp, str:concat(switchLabel, '===', port) as alertKey, message insert into output; "; |
| SiddhiManager manager = new SiddhiManager(); |
| SiddhiAppRuntime runtime = manager.createSiddhiAppRuntime(ql); |
| runtime.addCallback("output", new StreamCallback() { |
| @Override |
| public void receive(Event[] events) { |
| EventPrinter.print(events); |
| } |
| }); |
| |
| runtime.start(); |
| |
| InputHandler logInput = runtime.getInputHandler("log"); |
| |
| Event e = new Event(); |
| e.setTimestamp(System.currentTimeMillis()); |
| e.setData(new Object[] {System.currentTimeMillis(), "switch-ra-slc-01", "port01", "log-message...."}); |
| logInput.send(e); |
| |
| Thread.sleep(1000); |
| runtime.shutdown(); |
| |
| } |
| |
| } |