blob: fbe7754d5b8a6df4a3384dd86e6ff1c4d4507bb4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import java.io.File;
import java.util.Properties;
public class SmokeTestClient extends SmokeTestUtil {
private final String kafka;
private final String zookeeper;
private final File stateDir;
private KafkaStreams streams;
private Thread thread;
public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
this.zookeeper = zookeeper;
}
public void start() {
streams = createKafkaStreams(stateDir, kafka, zookeeper);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
}
});
thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
}
public void close() {
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
source.to(stringSerde, intSerde, "echo");
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return value == null || value != END;
}
});
data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
// min
data.aggregateByKey(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MAX_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
return (value < aggregate) ? value : aggregate;
}
},
UnlimitedWindows.of("uwin-min"),
stringSerde,
intSerde
).toStream().map(
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "min");
KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min");
minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
// max
data.aggregateByKey(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MIN_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
return (value > aggregate) ? value : aggregate;
}
},
UnlimitedWindows.of("uwin-max"),
stringSerde,
intSerde
).toStream().map(
new Unwindow<String, Integer>()
).to(stringSerde, intSerde, "max");
KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max");
maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
// sum
data.aggregateByKey(
new Initializer<Long>() {
public Long apply() {
return 0L;
}
},
new Aggregator<String, Integer, Long>() {
@Override
public Long apply(String aggKey, Integer value, Long aggregate) {
return (long) value + aggregate;
}
},
UnlimitedWindows.of("win-sum"),
stringSerde,
longSerde
).toStream().map(
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "sum");
KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum");
sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
// cnt
data.countByKey(
UnlimitedWindows.of("uwin-cnt"),
stringSerde
).toStream().map(
new Unwindow<String, Long>()
).to(stringSerde, longSerde, "cnt");
KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt");
cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
// dif
maxTable.join(minTable,
new ValueJoiner<Integer, Integer, Integer>() {
public Integer apply(Integer value1, Integer value2) {
return value1 - value2;
}
}
).to(stringSerde, intSerde, "dif");
// avg
sumTable.join(
cntTable,
new ValueJoiner<Long, Long, Double>() {
public Double apply(Long value1, Long value2) {
return (double) value1 / (double) value2;
}
}
).to(stringSerde, doubleSerde, "avg");
// windowed count
data.countByKey(
TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE),
stringSerde
).toStream().map(
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
return new KeyValue<>(key.key() + "@" + key.window().start(), value);
}
}
).to(stringSerde, longSerde, "wcnt");
// test repartition
Agg agg = new Agg();
cntTable.groupBy(agg.selector(),
stringSerde,
longSerde
).aggregate(agg.init(),
agg.adder(),
agg.remover(),
longSerde,
"cntByCnt"
).to(stringSerde, longSerde, "tagg");
return new KafkaStreams(builder, props);
}
}