/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.core.construction;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi.CombinePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExternalPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.ReadPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Utilities for interacting with portability {@link Environment environments}. */
public class Environments {
  private static final ImmutableMap<String, EnvironmentIdExtractor> KNOWN_URN_SPEC_EXTRACTORS =
      ImmutableMap.<String, EnvironmentIdExtractor>builder()
          .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, Environments::combineExtractor)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
              Environments::combineExtractor)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
              Environments::combineExtractor)
          .put(
              PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
              Environments::combineExtractor)
          .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, Environments::parDoExtractor)
          .put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, Environments::parDoExtractor)
          .put(
              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
              Environments::parDoExtractor)
          .put(
              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
              Environments::parDoExtractor)
          .put(
              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
              Environments::parDoExtractor)
          .put(PTransformTranslation.READ_TRANSFORM_URN, Environments::readExtractor)
          .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, Environments::windowExtractor)
          .build();

  private static final EnvironmentIdExtractor DEFAULT_SPEC_EXTRACTOR = transform -> null;

  private static final ObjectMapper MAPPER =
      new ObjectMapper()
          .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
  public static final String ENVIRONMENT_DOCKER = "DOCKER";
  public static final String ENVIRONMENT_PROCESS = "PROCESS";
  public static final String ENVIRONMENT_EXTERNAL = "EXTERNAL";
  public static final String ENVIRONMENT_EMBEDDED = "EMBEDDED"; // Non Public urn for testing
  public static final String ENVIRONMENT_LOOPBACK = "LOOPBACK"; // Non Public urn for testing

  /* For development, use the container build by the current user to ensure that the SDK harness and
   * the SDK agree on how they should interact. This should be changed to a version-specific
   * container during a release.
   *
   * See https://beam.apache.org/contribute/docker-images/ for more information on how to build a
   * container.
   */
  private static final String JAVA_SDK_HARNESS_CONTAINER_URL = "apachebeam/java_sdk";
  public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT =
      createDockerEnvironment(JAVA_SDK_HARNESS_CONTAINER_URL);

  private Environments() {}

  public static Environment createOrGetDefaultEnvironment(String type, String config) {
    if (Strings.isNullOrEmpty(type)) {
      return JAVA_SDK_HARNESS_ENVIRONMENT;
    }

    switch (type) {
      case ENVIRONMENT_EMBEDDED:
        return createEmbeddedEnvironment(config);
      case ENVIRONMENT_EXTERNAL:
      case ENVIRONMENT_LOOPBACK:
        return createExternalEnvironment(config);
      case ENVIRONMENT_PROCESS:
        return createProcessEnvironment(config);
      case ENVIRONMENT_DOCKER:
      default:
        return createDockerEnvironment(config);
    }
  }

  public static Environment createDockerEnvironment(String dockerImageUrl) {
    return Environment.newBuilder()
        .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
        .setPayload(
            DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString())
        .build();
  }

  private static Environment createExternalEnvironment(String config) {
    return Environment.newBuilder()
        .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL))
        .setPayload(
            ExternalPayload.newBuilder()
                .setEndpoint(ApiServiceDescriptor.newBuilder().setUrl(config).build())
                .build()
                .toByteString())
        .build();
  }

  private static Environment createProcessEnvironment(String config) {
    try {
      ProcessPayloadReferenceJSON payloadReferenceJSON =
          MAPPER.readValue(config, ProcessPayloadReferenceJSON.class);
      return createProcessEnvironment(
          payloadReferenceJSON.getOs(),
          payloadReferenceJSON.getArch(),
          payloadReferenceJSON.getCommand(),
          payloadReferenceJSON.getEnv());
    } catch (IOException e) {
      throw new RuntimeException(
          String.format("Unable to parse process environment config: %s", config), e);
    }
  }

  private static Environment createEmbeddedEnvironment(String config) {
    return Environment.newBuilder()
        .setUrn(ENVIRONMENT_EMBEDDED)
        .setPayload(ByteString.copyFromUtf8(MoreObjects.firstNonNull(config, "")))
        .build();
  }

  public static Environment createProcessEnvironment(
      String os, String arch, String command, Map<String, String> env) {
    ProcessPayload.Builder builder = ProcessPayload.newBuilder();
    if (!Strings.isNullOrEmpty(os)) {
      builder.setOs(os);
    }
    if (!Strings.isNullOrEmpty(arch)) {
      builder.setArch(arch);
    }
    if (!Strings.isNullOrEmpty(command)) {
      builder.setCommand(command);
    }
    if (env != null) {
      builder.putAllEnv(env);
    }
    return Environment.newBuilder()
        .setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
        .setPayload(builder.build().toByteString())
        .build();
  }

  public static Optional<Environment> getEnvironment(String ptransformId, Components components) {
    try {
      PTransform ptransform = components.getTransformsOrThrow(ptransformId);
      String envId =
          KNOWN_URN_SPEC_EXTRACTORS
              .getOrDefault(ptransform.getSpec().getUrn(), DEFAULT_SPEC_EXTRACTOR)
              .getEnvironmentId(ptransform);
      if (Strings.isNullOrEmpty(envId)) {
        // Some PTransform payloads may have an unspecified (empty) Environment ID, for example a
        // WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
        // as a GroupByKeyPayload, and the Default extractor returns null in this case.
        return Optional.empty();
      } else {
        return Optional.of(components.getEnvironmentsOrThrow(envId));
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  public static Optional<Environment> getEnvironment(
      PTransform ptransform, RehydratedComponents components) {
    try {
      String envId =
          KNOWN_URN_SPEC_EXTRACTORS
              .getOrDefault(ptransform.getSpec().getUrn(), DEFAULT_SPEC_EXTRACTOR)
              .getEnvironmentId(ptransform);
      if (!Strings.isNullOrEmpty(envId)) {
        // Some PTransform payloads may have an empty (default) Environment ID, for example a
        // WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
        // as a GroupByKeyPayload, and the Default extractor returns null in this case.
        return Optional.of(components.getEnvironment(envId));
      } else {
        return Optional.empty();
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private interface EnvironmentIdExtractor {
    @Nullable
    String getEnvironmentId(PTransform transform) throws IOException;
  }

  private static String parDoExtractor(PTransform pTransform)
      throws InvalidProtocolBufferException {
    return ParDoPayload.parseFrom(pTransform.getSpec().getPayload()).getDoFn().getEnvironmentId();
  }

  private static String combineExtractor(PTransform pTransform)
      throws InvalidProtocolBufferException {
    return CombinePayload.parseFrom(pTransform.getSpec().getPayload())
        .getCombineFn()
        .getEnvironmentId();
  }

  private static String readExtractor(PTransform transform) throws InvalidProtocolBufferException {
    return ReadPayload.parseFrom(transform.getSpec().getPayload()).getSource().getEnvironmentId();
  }

  private static String windowExtractor(PTransform transform)
      throws InvalidProtocolBufferException {
    return WindowIntoPayload.parseFrom(transform.getSpec().getPayload())
        .getWindowFn()
        .getEnvironmentId();
  }

  private static class ProcessPayloadReferenceJSON {
    @Nullable private String os;
    @Nullable private String arch;
    @Nullable private String command;
    @Nullable private Map<String, String> env;

    @Nullable
    public String getOs() {
      return os;
    }

    @Nullable
    public String getArch() {
      return arch;
    }

    @Nullable
    public String getCommand() {
      return command;
    }

    @Nullable
    public Map<String, String> getEnv() {
      return env;
    }
  }
}
