blob: 76946237c2019af94d42c6616ecba8920c861b6d [file] [log] [blame]
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.eagle.alert.engine.siddhi.extension;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.ExecutionPlanRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
/**
* @since Apr 1, 2016
*/
public class AttributeCollectAggregatorTest {
private static final Logger logger = LoggerFactory.getLogger(AttributeCollectAggregatorTest.class);
@Test
public void test() throws Exception {
String ql = "define stream s1(timestamp long, host string, type string);";
ql += " from s1#window.externalTime(timestamp, 1 sec)";
ql += " select eagle:collect(timestamp) as timestamps, eagle:collect(host) as hosts, type group by type insert into output;";
SiddhiManager sm = new SiddhiManager();
ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
InputHandler input = runtime.getInputHandler("s1");
runtime.addCallback("output", new StreamCallback() {
@Override
public void receive(Event[] arg0) {
logger.info("output event length:" + arg0.length);
for (Event e : arg0) {
StringBuilder sb = new StringBuilder("\t - [").append(e.getData().length).append("]");
for (Object o : e.getData()) {
sb.append("," + o);
}
logger.info(sb.toString());
}
logger.info("===end===");
}
});
// StreamDefinition definition = (StreamDefinition) runtime.getStreamDefinitionMap().get("output");
runtime.start();
Event[] events = generateEvents();
for (Event e : events) {
input.send(e);
}
Thread.sleep(1000);
}
private Event[] generateEvents() {
List<Event> events = new LinkedList<Event>();
Random r = new Random();
Event e = null;
long base = System.currentTimeMillis();
{
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova"});
base += 100;
events.add(e);
}
{
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron"});
base += 100;
events.add(e);
}
base += 10000;
{
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "nova1"});
base += 100;
events.add(e);
}
{
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "neutron2"});
base += 100;
events.add(e);
}
base += 10000;
e = new Event(base, new Object[] {base, "host" + r.nextInt(), "mq"});
return events.toArray(new Event[0]);
}
@Test
public void testQuery() {
String ql = "define stream perfmon_input_stream_cpu ( host string,timestamp long,metric string,pool string,value double,colo string );";
ql += "from perfmon_input_stream_cpu#window.length(3) select host, min(value) as min group by host having min>91.0 insert into perfmon_output_stream_cpu;";
SiddhiManager sm = new SiddhiManager();
sm.createExecutionPlanRuntime(ql);
}
}