/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.core.construction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.AfterAll;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Never;
import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** Utilities for working with {@link TriggerTranslation Triggers}. */
@Experimental(Experimental.Kind.TRIGGER)
public class TriggerTranslation implements Serializable {

  @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();

  public static RunnerApi.Trigger toProto(Trigger trigger) {
    return CONVERTER.convertTrigger(trigger);
  }

  @VisibleForTesting
  static class ProtoConverter {

    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
      return tryConvert(evaluationMethod, trigger);
    }

    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
      try {
        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
      } catch (InvocationTargetException exc) {
        if (exc.getCause() instanceof RuntimeException) {
          throw (RuntimeException) exc.getCause();
        } else {
          throw new RuntimeException(exc.getCause());
        }
      } catch (IllegalAccessException exc) {
        throw new IllegalStateException(
            String.format("Internal error: could not invoke %s", evaluationMethod));
      }
    }

    private Method getEvaluationMethod(Class<?> clazz) {
      try {
        return getClass().getDeclaredMethod("convertSpecific", clazz);
      } catch (NoSuchMethodException exc) {
        throw new IllegalArgumentException(
            String.format(
                "Cannot translate trigger class %s to a runner-API proto.",
                clazz.getCanonicalName()),
            exc);
      }
    }

    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
      return RunnerApi.Trigger.newBuilder()
          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
          .build();
    }

    private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
      return RunnerApi.Trigger.newBuilder()
          .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
          .build();
    }

    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
      return RunnerApi.Trigger.newBuilder()
          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
          .build();
    }

    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
      return RunnerApi.Trigger.newBuilder()
          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
          .build();
    }

    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
      return RunnerApi.Trigger.newBuilder()
          .setAfterSynchronizedProcessingTime(
              RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
          .build();
    }

    private RunnerApi.TimeDomain.Enum convertTimeDomain(TimeDomain timeDomain) {
      switch (timeDomain) {
        case EVENT_TIME:
          return RunnerApi.TimeDomain.Enum.EVENT_TIME;
        case PROCESSING_TIME:
          return RunnerApi.TimeDomain.Enum.PROCESSING_TIME;
        case SYNCHRONIZED_PROCESSING_TIME:
          return RunnerApi.TimeDomain.Enum.SYNCHRONIZED_PROCESSING_TIME;
        default:
          throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
      }
    }

    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
      RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();

      for (Trigger subtrigger : v.subTriggers()) {
        builder.addSubtriggers(toProto(subtrigger));
      }

      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
    }

    private RunnerApi.Trigger convertSpecific(AfterAll v) {
      RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();

      for (Trigger subtrigger : v.subTriggers()) {
        builder.addSubtriggers(toProto(subtrigger));
      }

      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
    }

    private RunnerApi.Trigger convertSpecific(AfterPane v) {
      return RunnerApi.Trigger.newBuilder()
          .setElementCount(
              RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
          .build();
    }

    private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();

      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
      if (v.getLateTrigger() != null) {
        builder.setLateFirings(toProto(v.getLateTrigger()));
      }

      return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
    }

    private RunnerApi.Trigger convertSpecific(AfterEach v) {
      RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();

      for (Trigger subtrigger : v.subTriggers()) {
        builder.addSubtriggers(toProto(subtrigger));
      }

      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
    }

    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
      return RunnerApi.Trigger.newBuilder()
          .setRepeat(
              RunnerApi.Trigger.Repeat.newBuilder()
                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
          .build();
    }

    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
      return RunnerApi.Trigger.newBuilder()
          .setOrFinally(
              RunnerApi.Trigger.OrFinally.newBuilder()
                  .setMain(toProto(v.getMainTrigger()))
                  .setFinally(toProto(v.getUntilTrigger())))
          .build();
    }

    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
          RunnerApi.Trigger.AfterProcessingTime.newBuilder();

      for (TimestampTransform transform : v.getTimestampTransforms()) {
        builder.addTimestampTransforms(convertTimestampTransform(transform));
      }

      return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
    }

    private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
      if (transform instanceof TimestampTransform.Delay) {
        return RunnerApi.TimestampTransform.newBuilder()
            .setDelay(
                RunnerApi.TimestampTransform.Delay.newBuilder()
                    .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
            .build();
      } else if (transform instanceof TimestampTransform.AlignTo) {
        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
        return RunnerApi.TimestampTransform.newBuilder()
            .setAlignTo(
                RunnerApi.TimestampTransform.AlignTo.newBuilder()
                    .setPeriod(alignTo.getPeriod().getMillis())
                    .setOffset(alignTo.getOffset().getMillis()))
            .build();

      } else {
        throw new IllegalArgumentException(
            String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
      }
    }
  }

  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
    switch (triggerProto.getTriggerCase()) {
      case AFTER_ALL:
        return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
      case AFTER_ANY:
        return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
      case AFTER_EACH:
        return AfterEach.inOrder(
            protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
      case AFTER_END_OF_WINDOW:
        RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();

        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
          return AfterWatermark.pastEndOfWindow();
        }

        // It either has early or late firings or both; our typing in Java makes this a smidge
        // annoying
        if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
          AfterWatermarkEarlyAndLate trigger =
              AfterWatermark.pastEndOfWindow()
                  .withEarlyFirings(
                      (OnceTrigger)
                          fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));

          if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
            trigger =
                trigger.withLateFirings(
                    (OnceTrigger)
                        fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
          }
          return trigger;
        } else {
          // only late firings, so return directly
          return AfterWatermark.pastEndOfWindow()
              .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
        }
      case AFTER_PROCESSING_TIME:
        AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
        for (RunnerApi.TimestampTransform transform :
            triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
          switch (transform.getTimestampTransformCase()) {
            case ALIGN_TO:
              trigger =
                  trigger.alignedTo(
                      Duration.millis(transform.getAlignTo().getPeriod()),
                      new Instant(transform.getAlignTo().getOffset()));
              break;
            case DELAY:
              trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
              break;
            case TIMESTAMPTRANSFORM_NOT_SET:
              throw new IllegalArgumentException(
                  String.format(
                      "Required field 'timestamp_transform' not set in %s", transform));
            default:
              throw new IllegalArgumentException(
                  String.format(
                      "Unknown timestamp transform case: %s",
                      transform.getTimestampTransformCase()));
          }
        }
        return trigger;
      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
        return AfterSynchronizedProcessingTime.ofFirstElement();
      case ALWAYS:
        return new ReshuffleTrigger();
      case ELEMENT_COUNT:
        return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
      case NEVER:
        return Never.ever();
      case OR_FINALLY:
        return fromProto(triggerProto.getOrFinally().getMain())
            .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
      case REPEAT:
        return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
      case DEFAULT:
        return DefaultTrigger.of();
      case TRIGGER_NOT_SET:
        throw new IllegalArgumentException(
            String.format("Required field 'trigger' not set in %s", triggerProto));
      default:
        throw new IllegalArgumentException(
            String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
    }
  }

  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
    List<Trigger> result = Lists.newArrayList();
    for (RunnerApi.Trigger trigger : triggers) {
      result.add(fromProto(trigger));
    }
    return result;
  }

  // Do not instantiate
  private TriggerTranslation() {}
}
