/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.kafka.streams.smoketest;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TumblingWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;

import java.io.File;
import java.util.Properties;

public class SmokeTestClient extends SmokeTestUtil {

    private final String kafka;
    private final String zookeeper;
    private final File stateDir;
    private KafkaStreams streams;
    private Thread thread;

    public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
        super();
        this.stateDir = stateDir;
        this.kafka = kafka;
        this.zookeeper = zookeeper;
    }

    public void start() {
        streams = createKafkaStreams(stateDir, kafka, zookeeper);
        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                e.printStackTrace();
            }
        });

        thread = new Thread() {
            public void run() {
                streams.start();
            }
        };
        thread.start();
    }

    public void close() {
        streams.close();
        try {
            thread.join();
        } catch (Exception ex) {
            // ignore
        }
    }

    private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
        props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class.getName());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
        props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");

        source.to(stringSerde, intSerde, "echo");

        KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
            @Override
            public boolean test(String key, Integer value) {
                return value == null || value != END;
            }
        });

        data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));

        // min
        data.aggregateByKey(
                new Initializer<Integer>() {
                    public Integer apply() {
                        return Integer.MAX_VALUE;
                    }
                },
                new Aggregator<String, Integer, Integer>() {
                    @Override
                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
                        return (value < aggregate) ? value : aggregate;
                    }
                },
                UnlimitedWindows.of("uwin-min"),
                stringSerde,
                intSerde
        ).toStream().map(
                new Unwindow<String, Integer>()
        ).to(stringSerde, intSerde, "min");

        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min");
        minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));

        // max
        data.aggregateByKey(
                new Initializer<Integer>() {
                    public Integer apply() {
                        return Integer.MIN_VALUE;
                    }
                },
                new Aggregator<String, Integer, Integer>() {
                    @Override
                    public Integer apply(String aggKey, Integer value, Integer aggregate) {
                        return (value > aggregate) ? value : aggregate;
                    }
                },
                UnlimitedWindows.of("uwin-max"),
                stringSerde,
                intSerde
        ).toStream().map(
                new Unwindow<String, Integer>()
        ).to(stringSerde, intSerde, "max");

        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max");
        maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));

        // sum
        data.aggregateByKey(
                new Initializer<Long>() {
                    public Long apply() {
                        return 0L;
                    }
                },
                new Aggregator<String, Integer, Long>() {
                    @Override
                    public Long apply(String aggKey, Integer value, Long aggregate) {
                        return (long) value + aggregate;
                    }
                },
                UnlimitedWindows.of("win-sum"),
                stringSerde,
                longSerde
        ).toStream().map(
                new Unwindow<String, Long>()
        ).to(stringSerde, longSerde, "sum");


        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum");
        sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));

        // cnt
        data.countByKey(
                UnlimitedWindows.of("uwin-cnt"),
                stringSerde
        ).toStream().map(
                new Unwindow<String, Long>()
        ).to(stringSerde, longSerde, "cnt");

        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt");
        cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));

        // dif
        maxTable.join(minTable,
                new ValueJoiner<Integer, Integer, Integer>() {
                    public Integer apply(Integer value1, Integer value2) {
                        return value1 - value2;
                    }
                }
        ).to(stringSerde, intSerde, "dif");

        // avg
        sumTable.join(
                cntTable,
                new ValueJoiner<Long, Long, Double>() {
                    public Double apply(Long value1, Long value2) {
                        return (double) value1 / (double) value2;
                    }
                }
        ).to(stringSerde, doubleSerde, "avg");

        // windowed count
        data.countByKey(
                TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
                stringSerde
        ).toStream().map(
                new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                    @Override
                    public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
                        return new KeyValue<>(key.value() + "@" + key.window().start(), value);
                    }
                }
        ).to(stringSerde, longSerde, "wcnt");

        // test repartition
        Agg agg = new Agg();
        cntTable.aggregate(
                agg.init(),
                agg.adder(),
                agg.remover(),
                agg.selector(),
                stringSerde,
                longSerde,
                longSerde,
                "cntByCnt"
        ).to(stringSerde, longSerde, "tagg");

        return new KafkaStreams(builder, props);
    }

}
