/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.apex.malhar.stream.sample.complete;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;

import com.google.common.base.Throwables;

import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;

import static org.apache.apex.malhar.stream.api.Option.Options.name;

/**
 * An example that computes the most popular hash tags
 * for every prefix, which can be used for auto-completion.
 * This application is identical to TwitterAutoComplete, except it's
 * reading from a file. This application is mainly for local testing
 * purpose.
 *
 * <p>This will update the datastore every 10 seconds based on the last
 * 30 minutes of data received.
 *
 * @since 3.5.0
 */
@ApplicationAnnotation(name = "AutoComplete")
public class AutoComplete implements StreamingApplication
{

  /**
   * A dummy Twitter input operator. It reads from a text file containing some tweets and output a line every
   * half of a second.
   */
  public static class TweetsInput extends BaseOperator implements InputOperator
  {
    private static boolean done = false;
    public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>();

    private transient BufferedReader reader;

    public static boolean isDone()
    {
      return done;
    }

    @Override
    public void setup(OperatorContext context)
    {
      done = false;
      initReader();
    }

    private void initReader()
    {
      try {
        InputStream resourceStream = this.getClass().getResourceAsStream("/sampletweets.txt");
        reader = new BufferedReader(new InputStreamReader(resourceStream));
      } catch (Exception ex) {
        throw Throwables.propagate(ex);
      }
    }

    @Override
    public void teardown()
    {
      IOUtils.closeQuietly(reader);
    }

    @Override
    public void emitTuples()
    {
      try {
        String line = reader.readLine();
        if (line == null) {
          done = true;
          reader.close();
          Thread.sleep(1000);
        } else {
          this.output.emit(line);
        }
        Thread.sleep(50);
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      } catch (InterruptedException e) {
        // Ignore it.
      }
    }
  }

  public static class Collector extends BaseOperator
  {
    private static Map<String, List<CompletionCandidate>> result = new HashMap<>();

    public static Map<String, List<CompletionCandidate>> getResult()
    {
      return result;
    }

    public final transient DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> input = new DefaultInputPort<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>()
    {
      @Override
      public void process(Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>> tuple)
      {
        result.put(tuple.getValue().getKey(), tuple.getValue().getValue());
      }
    };
  }

  /**
   * FlapMap Function to extract all hashtags from a text form tweet.
   */
  private static class ExtractHashtags implements Function.FlatMapFunction<String, String>
  {

    @Override
    public Iterable<String> f(String input)
    {
      List<String> result = new LinkedList<>();
      Matcher m = Pattern.compile("#\\S+").matcher(input);
      while (m.find()) {
        result.add(m.group().substring(1));
      }
      return result;
    }
  }

  /**
   * Lower latency, but more expensive.
   */
  private static class ComputeTopFlat
      extends CompositeStreamTransform<WindowedStream<CompletionCandidate>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
  {
    private final int candidatesPerPrefix;
    private final int minPrefix;

    public ComputeTopFlat(int candidatesPerPrefix, int minPrefix)
    {
      this.candidatesPerPrefix = candidatesPerPrefix;
      this.minPrefix = minPrefix;
    }

    @Override
    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(
        WindowedStream<CompletionCandidate> input)
    {
      return input
        .<KeyValPair<String, CompletionCandidate>, WindowedStream<KeyValPair<String, CompletionCandidate>>>flatMap(new AllPrefixes(minPrefix))
        .accumulateByKey(new TopNByKey(), new Function.ToKeyValue<KeyValPair<String, CompletionCandidate>, String,
          CompletionCandidate>()
        {
          @Override
          public Tuple<KeyValPair<String, CompletionCandidate>> f(KeyValPair<String, CompletionCandidate> tuple)
          {
            // TODO: Should be removed after Auto-wrapping is supported.
            return new Tuple.WindowedTuple<>(Window.GLOBAL_WINDOW, tuple);
          }
        });
    }
  }

  /**
   * FlapMap Function to extract all prefixes of the hashtag in the input CompletionCandidate, and output
   * KeyValPairs of the prefix and the CompletionCandidate
   */
  private static class AllPrefixes implements Function.FlatMapFunction<CompletionCandidate, KeyValPair<String, CompletionCandidate>>
  {
    private final int minPrefix;
    private final int maxPrefix;

    public AllPrefixes()
    {
      this(0, Integer.MAX_VALUE);
    }

    public AllPrefixes(int minPrefix)
    {
      this(minPrefix, Integer.MAX_VALUE);
    }

    public AllPrefixes(int minPrefix, int maxPrefix)
    {
      this.minPrefix = minPrefix;
      this.maxPrefix = maxPrefix;
    }

    @Override
    public Iterable<KeyValPair<String, CompletionCandidate>> f(CompletionCandidate input)
    {
      List<KeyValPair<String, CompletionCandidate>> result = new LinkedList<>();
      String word = input.getValue();
      for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) {

        result.add(new KeyValPair<>(input.getValue().substring(0, i).toLowerCase(), input));
      }
      return result;
    }
  }

  /**
   * A Composite stream transform that takes as input a list of tokens and returns
   * the most common tokens per prefix.
   */
  public static class ComputeTopCompletions
      extends CompositeStreamTransform<WindowedStream<String>, WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>>>
  {
    private final int candidatesPerPrefix;
    private final boolean recursive;

    protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive)
    {
      this.candidatesPerPrefix = candidatesPerPrefix;
      this.recursive = recursive;
    }

    public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive)
    {
      return new ComputeTopCompletions(candidatesPerPrefix, recursive);
    }

    @Override
    @SuppressWarnings("unchecked")
    public WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<CompletionCandidate>>>> compose(WindowedStream<String> inputStream)
    {
      ApexStream<CompletionCandidate> candidates = inputStream
          .countByKey(new Function.ToKeyValue<String, String, Long>()
          {
            @Override
            public Tuple<KeyValPair<String, Long>> f(String input)
            {
              return new Tuple.PlainTuple<>(new KeyValPair<>(input, 1L));
            }
          }, name("countByKey"))
          .map(new Function.MapFunction<Tuple.WindowedTuple<KeyValPair<String,Long>>, CompletionCandidate>()
          {
            @Override
            public CompletionCandidate f(Tuple.WindowedTuple<KeyValPair<String, Long>> input)
            {
              return new CompletionCandidate(input.getValue().getKey(), input.getValue().getValue());
            }
          }, name("ToCompletionCandidate"));

      return candidates.addCompositeStreams(new ComputeTopFlat(10, 1));

    }
  }

  /**
   * Populate the dag with High-Level API.
   * @param dag
   * @param conf
   */
  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    TweetsInput input = new TweetsInput();
    Collector collector = new Collector();

    WindowOption windowOption = new WindowOption.GlobalWindow();

    ApexStream<String> tags = StreamFactory.fromInput(input, input.output, name("tweetSampler"))
        .flatMap(new ExtractHashtags());

    tags.window(windowOption, new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1))
        .addCompositeStreams(ComputeTopCompletions.top(10, true)).endWith(collector, collector.input, name("collector"))
        .populateDag(dag);
  }
}
