blob: a1aa55214c828be2a48f423559ee2c04de1600cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.storm.sql;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.streams.Pair;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
public class TestUtils {
public static final class MockInsertBoltExtension implements BeforeEachCallback {
@Override
public void beforeEach(ExtensionContext ctx) throws Exception {
MockInsertBolt.getCollectedValues().clear();
}
}
public static final class MockBoltExtension implements BeforeEachCallback {
@Override
public void beforeEach(ExtensionContext arg0) throws Exception {
MockBolt.getCollectedValues().clear();
}
}
public static class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
return x + y;
}
}
public static class MyConcat {
public static String init() {
return "";
}
public static String add(String accumulator, String val) {
return accumulator + val;
}
public static String result(String accumulator) {
return accumulator;
}
}
public static class TopN {
public static PriorityQueue<Integer> init() {
return new PriorityQueue<>();
}
public static PriorityQueue<Integer> add(PriorityQueue<Integer> accumulator, Integer n, Integer val) {
if (n <= 0) {
return accumulator;
}
if (accumulator.size() >= n) {
if (val > accumulator.peek()) {
accumulator.remove();
accumulator.add(val);
}
} else {
accumulator.add(val);
}
return accumulator;
}
public static List<Integer> result(PriorityQueue<Integer> accumulator) {
List<Integer> res = new ArrayList<>(accumulator);
Collections.reverse(res);
return res;
}
}
public static class MockSpout extends BaseRichSpout {
private final List<Values> records;
private final Fields outputFields;
private boolean emitted = false;
private SpoutOutputCollector collector;
public MockSpout(List<Values> records, Fields outputFields) {
this.records = records;
this.outputFields = outputFields;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
if (emitted) {
return;
}
for (Values r : records) {
collector.emit(r);
}
emitted = true;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(outputFields);
}
}
public static class MockBolt extends BaseRichBolt {
/**
* Collect all values in a static variable as the instance will go through serialization and deserialization.
* NOTE: This should be cleared before or after running each test.
*/
private transient static final List<Values> VALUES = new ArrayList<>();
public static List<Values> getCollectedValues() {
return VALUES;
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
VALUES.add((Values) input.getValue(0));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static class MockInsertBolt extends BaseRichBolt {
/**
* Collect all values in a static variable as the instance will go through serialization and deserialization.
* NOTE: This should be cleared before or after running each test.
*/
private transient static final List<Pair<Object, Values>> VALUES = new ArrayList<>();
public static List<Pair<Object, Values>> getCollectedValues() {
return VALUES;
}
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
VALUES.add(Pair.of(input.getValue(0), (Values) input.getValue(1)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static class MockSqlExprDataSource implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsOutputDataSource implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static class MockSqlStreamsDataSource implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
List<Values> records = new ArrayList<>();
records.add(new Values(0, "a", "y"));
records.add(new Values(1, "ab", "y"));
records.add(new Values(2, "abc", "y"));
records.add(new Values(3, "abcd", "y"));
records.add(new Values(4, "abcde", "y"));
Fields outputFields = new Fields("ID", "NAME", "ADDR");
return new MockSpout(records, outputFields);
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsInsertDataSource extends MockSqlStreamsNestedDataSource {
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static class MockSqlStreamsGroupedDataSource implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
List<Values> records = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
records.add(new Values(i, 0, "x", "y", 5 - i, i * 10));
}
Fields outputFields = new Fields("ID", "GRPID", "NAME", "ADDR", "AGE", "SCORE");
return new MockSpout(records, outputFields);
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsInsertGroupedDataSource extends MockSqlStreamsGroupedDataSource {
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static class MockSqlStreamsJoinDataSourceEmp implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
List<Values> records = new ArrayList<>();
Fields outputFields = new Fields("EMPID", "EMPNAME", "DEPTID");
for (int i = 0; i < 5; ++i) {
records.add(new Values(i, "emp-" + i, i % 2));
}
for (int i = 10; i < 15; ++i) {
records.add(new Values(i, "emp-" + i, i));
}
return new MockSpout(records, outputFields);
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsInsertJoinDataSourceEmp extends MockSqlStreamsJoinDataSourceEmp {
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static class MockSqlStreamsJoinDataSourceDept implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
List<Values> records = new ArrayList<>();
Fields outputFields = new Fields("DEPTID", "DEPTNAME");
for (int i = 0; i < 5; ++i) {
records.add(new Values(i, "dept-" + i));
}
return new MockSpout(records, outputFields);
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsInsertJoinDataSourceDept extends MockSqlStreamsJoinDataSourceDept {
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static class MockSqlStreamsNestedDataSource implements ISqlStreamsDataSource {
@Override
public IRichSpout getProducer() {
List<Values> records = new ArrayList<>();
Fields outputFields = new Fields("ID", "MAPFIELD", "NESTEDMAPFIELD", "ARRAYFIELD");
List<Integer> ints = Arrays.asList(100, 200, 300);
for (int i = 0; i < 5; ++i) {
Map<String, Integer> map = new HashMap<>();
map.put("b", i);
map.put("c", i*i);
Map<String, Map<String, Integer>> mm = new HashMap<>();
mm.put("a", map);
records.add(new Values(i, map, mm, ints));
}
return new MockSpout(records, outputFields);
}
@Override
public IRichBolt getConsumer() {
return new MockBolt();
}
}
public static class MockSqlStreamsInsertNestedDataSource extends MockSqlStreamsNestedDataSource {
@Override
public IRichBolt getConsumer() {
return new MockInsertBolt();
}
}
public static long monotonicNow() {
final long NANOSECONDS_PER_MILLISECOND = 1000000;
return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
}
}