blob: 655b79b100389ad78d7669022518e5e89a5f200d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.common.agg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.query.output.callback.QueryCallback;
import io.siddhi.core.stream.input.InputHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Since 8/3/16.
*/
public class SiddhiAggregator {
private static final Logger LOG = LoggerFactory.getLogger(SiddhiAggregator.class);
private TimeBatchWindowSpec spec;
private StreamDefinition sd;
private InputHandler input;
public SiddhiAggregator(TimeBatchWindowSpec spec, StreamDefinition sd, final AggregateHandler handler) {
this.spec = spec;
this.sd = sd;
Map<String, Integer> colIndices = new HashMap<>();
List<String> colNames = new ArrayList<>();
int i = 0;
for (String col : spec.groupby.cols) {
colIndices.put(col, i++);
colNames.add(col);
}
for (Agg agg : spec.aggs) {
colIndices.put(agg.alias, i++);
colNames.add(agg.alias);
}
String query = buildSiddhiAggQuery();
SiddhiManager sm = new SiddhiManager();
SiddhiAppRuntime runtime = sm.createSiddhiAppRuntime(query);
input = runtime.getInputHandler("s");
runtime.addCallback("query", new QueryCallback() {
@Override
public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
List<AggregateResult> rows = new ArrayList<AggregateResult>();
for (Event e : inEvents) {
AggregateResult result = new AggregateResult(e.getData(), colIndices, colNames);
rows.add(result);
}
handler.onAggregate(rows);
}
});
runtime.start();
}
public void add(Object[] data) throws Exception {
input.send(data);
}
/**
* example siddhi query
* String ql = "define stream s (host string, timestamp long, metric string, site string, value double);" +
* "@info(name='query') " +
* " from s[metric == \"missingblocks\"]#window.externalTimeBatch(timestamp, 1 min, 0) select host, count(value) as avg group by host insert into tmp; ";
*
* @return
*/
private String buildSiddhiAggQuery() {
StringBuilder sb = new StringBuilder();
sb.append("define stream s(");
if (sd.columns == null || sd.columns.size() == 0) {
throw new IllegalStateException("input stream should contains at least one column");
}
for (Column col : sd.columns) {
appendColumnDef(sb, col);
sb.append(",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(");");
sb.append(" @info(name='query') ");
sb.append("from s[");
sb.append(spec.filter);
sb.append("]#window.externalTimeBatch(");
sb.append(spec.timestampColumn);
sb.append(",");
sb.append(spec.windowDuration);
sb.append(",");
sb.append(spec.start);
sb.append(")");
sb.append(" select ");
for (String gbField : spec.groupby.cols) {
sb.append(gbField);
sb.append(",");
}
if (spec.aggs == null) {
throw new IllegalStateException("at least one aggregate function should be present");
}
for (Agg agg : spec.aggs) {
sb.append(agg.function);
sb.append("(");
sb.append(agg.field);
sb.append(") as ");
sb.append(agg.alias);
sb.append(",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(" group by ");
for (String gbField : spec.groupby.cols) {
sb.append(gbField);
sb.append(",");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(" insert into tmp;");
LOG.info("query : " + sb.toString());
return sb.toString();
}
private void appendColumnDef(StringBuilder sb, Column col) {
sb.append(col.name);
sb.append(" ");
sb.append(col.type);
}
}