/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker.graph;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.json.GenericJson;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.JsonGenerator;
import com.google.api.client.util.Charsets;
import com.google.api.services.dataflow.model.InstructionOutput;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.auto.value.AutoValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Operation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;

/** Container class for different types of network nodes. All nodes only have reference equality. */
public class Nodes {
  /** Base class for network nodes. All nodes only have reference equality. */
  public abstract static class Node {
    @Override
    public final boolean equals(Object obj) {
      return this == obj;
    }

    @Override
    public final int hashCode() {
      return super.hashCode();
    }
  }

  private static String toStringWithTrimmedLiterals(GenericJson json) {
    try {
      ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
      final JsonGenerator baseGenerator =
          MoreObjects.firstNonNull(json.getFactory(), Transport.getJsonFactory())
              .createJsonGenerator(byteStream, Charsets.UTF_8);
      JsonGenerator generator =
          new JsonGenerator() {
            @Override
            public void writeString(String value) throws IOException {
              if (value.length() > 100) {
                baseGenerator.writeString(value.substring(0, 100) + "...");
              } else {
                baseGenerator.writeString(value);
              }
            }

            @Override
            public JsonFactory getFactory() {
              return baseGenerator.getFactory();
            }

            @Override
            public void flush() throws IOException {
              baseGenerator.flush();
            }

            @Override
            public void close() throws IOException {
              baseGenerator.close();
            }

            @Override
            public void writeStartArray() throws IOException {
              baseGenerator.writeStartArray();
            }

            @Override
            public void writeEndArray() throws IOException {
              baseGenerator.writeEndArray();
            }

            @Override
            public void writeStartObject() throws IOException {
              baseGenerator.writeStartObject();
            }

            @Override
            public void writeEndObject() throws IOException {
              baseGenerator.writeEndObject();
            }

            @Override
            public void writeFieldName(String name) throws IOException {
              baseGenerator.writeFieldName(name);
            }

            @Override
            public void writeNull() throws IOException {
              baseGenerator.writeNull();
            }

            @Override
            public void writeBoolean(boolean state) throws IOException {
              baseGenerator.writeBoolean(state);
            }

            @Override
            public void writeNumber(int v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(long v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(BigInteger v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(float v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(double v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(BigDecimal v) throws IOException {
              baseGenerator.writeNumber(v);
            }

            @Override
            public void writeNumber(String encodedValue) throws IOException {
              baseGenerator.writeNumber(encodedValue);
            }

            @Override
            public void enablePrettyPrint() throws IOException {
              baseGenerator.enablePrettyPrint();
            }
          };
      generator.enablePrettyPrint();
      generator.serialize(json);
      generator.flush();
      return byteStream.toString();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  /** Property indicating on which harness a ParallelInstructionNode will execute. */
  public enum ExecutionLocation {
    UNKNOWN, // Indicates location has not yet been decided.
    SDK_HARNESS,
    RUNNER_HARNESS,
    AMBIGUOUS // Node can execute in either or both harnesses.
  }

  /** A node that stores {@link ParallelInstruction}s. */
  @AutoValue
  public abstract static class ParallelInstructionNode extends Node {
    public static ParallelInstructionNode create(
        ParallelInstruction parallelInstruction, ExecutionLocation executionLocation) {
      checkNotNull(parallelInstruction);
      checkNotNull(executionLocation);
      return new AutoValue_Nodes_ParallelInstructionNode(parallelInstruction, executionLocation);
    }

    public abstract ParallelInstruction getParallelInstruction();

    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this)
          .add("parallelInstruction", toStringWithTrimmedLiterals(getParallelInstruction()))
          .add("executionLocation", getExecutionLocation().toString())
          .toString();
    }

    public abstract ExecutionLocation getExecutionLocation();
  }

  /** A node that stores {@link InstructionOutput}s with the corresponding . */
  @AutoValue
  public abstract static class InstructionOutputNode extends Node {
    public static InstructionOutputNode create(
        InstructionOutput instructionOutput, String pcollectionId) {
      checkNotNull(instructionOutput);
      checkNotNull(pcollectionId);
      return new AutoValue_Nodes_InstructionOutputNode(instructionOutput, pcollectionId);
    }

    public abstract InstructionOutput getInstructionOutput();

    public abstract String getPcollectionId();

    @Override
    public String toString() {
      return MoreObjects.toStringHelper(this)
          .add("instructionOutput", toStringWithTrimmedLiterals(getInstructionOutput()))
          .add("pcollectionId", getPcollectionId())
          .toString();
    }
  }

  /** A node that stores {@link OutputReceiver}s. */
  @AutoValue
  public abstract static class OutputReceiverNode extends Node {
    public static OutputReceiverNode create(
        OutputReceiver outputReceiver, Coder<?> coder, String pcollectionId) {
      checkNotNull(outputReceiver);
      checkNotNull(pcollectionId);
      return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder, pcollectionId);
    }

    public abstract OutputReceiver getOutputReceiver();

    public abstract Coder<?> getCoder();

    public abstract String getPcollectionId();
  }

  /** A node that stores {@link Operation}s. */
  @AutoValue
  public abstract static class OperationNode extends Node {
    public static OperationNode create(Operation operation) {
      checkNotNull(operation);
      return new AutoValue_Nodes_OperationNode(operation);
    }

    public abstract Operation getOperation();
  }

  /** A node that stores {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort}s. */
  @AutoValue
  public abstract static class RemoteGrpcPortNode extends Node {
    public static RemoteGrpcPortNode create(
        BeamFnApi.RemoteGrpcPort port, String primitiveTransformId) {
      checkNotNull(port);
      return new AutoValue_Nodes_RemoteGrpcPortNode(port, primitiveTransformId);
    }

    public abstract BeamFnApi.RemoteGrpcPort getRemoteGrpcPort();

    public abstract String getPrimitiveTransformId();
  }

  /** A node that stores {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.RegisterRequest}s. */
  @AutoValue
  public abstract static class RegisterRequestNode extends Node {
    public static RegisterRequestNode create(
        BeamFnApi.RegisterRequest request,
        Map<String, NameContext> ptransformIdToPartialNameContextMap,
        Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
        Map<String, Iterable<PCollectionView<?>>> ptransformIdToPCollectionViewMap,
        Map<String, NameContext> pcollectionToPartialNameContextMap) {
      checkNotNull(request);
      checkNotNull(ptransformIdToPartialNameContextMap);
      return new AutoValue_Nodes_RegisterRequestNode(
          request,
          ptransformIdToPartialNameContextMap,
          ptransformIdToSideInputInfoMap,
          ptransformIdToPCollectionViewMap,
          pcollectionToPartialNameContextMap);
    }

    public abstract BeamFnApi.RegisterRequest getRegisterRequest();

    public abstract Map<String, NameContext> getPTransformIdToPartialNameContextMap();

    public abstract Map<String, Iterable<SideInputInfo>> getPTransformIdToSideInputInfoMap();

    public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();

    public abstract Map<String, NameContext> getPCollectionToPartialNameContextMap();

    @Override
    public String toString() {
      // The request may be very large.
      return "RegisterRequestNode";
    }
  }

  /** A node that stores {@link org.apache.beam.runners.core.construction.graph.ExecutableStage}. */
  @AutoValue
  public abstract static class ExecutableStageNode extends Node {
    public static ExecutableStageNode create(
        ExecutableStage executableStage,
        Map<String, NameContext> ptransformIdToPartialNameContextMap,
        Map<String, Iterable<SideInputInfo>> ptransformIdToSideInputInfoMap,
        Map<String, Iterable<PCollectionView<?>>> pTransformIdToPCollectionViewMap) {
      checkNotNull(executableStage);
      checkNotNull(ptransformIdToPartialNameContextMap);
      return new AutoValue_Nodes_ExecutableStageNode(
          executableStage,
          ptransformIdToPartialNameContextMap,
          ptransformIdToSideInputInfoMap,
          pTransformIdToPCollectionViewMap);
    }

    public abstract ExecutableStage getExecutableStage();

    public abstract Map<String, NameContext> getPTransformIdToPartialNameContextMap();

    public abstract Map<String, Iterable<SideInputInfo>> getPTransformIdToSideInputInfoMap();

    public abstract Map<String, Iterable<PCollectionView<?>>> getPTransformIdToPCollectionViewMap();

    @Override
    public String toString() {
      // The request may be very large.
      return "ExecutableStageNode";
    }
  }

  /**
   * A node in the graph responsible for fetching side inputs that are ready and also filtering
   * elements which are blocked after asking the SDK harness to perform any window mapping.
   *
   * <p>Note that this should only be used within streaming pipelines.
   */
  @AutoValue
  public abstract static class FetchAndFilterStreamingSideInputsNode extends Node {
    public static FetchAndFilterStreamingSideInputsNode create(
        WindowingStrategy<?, ?> windowingStrategy,
        Map<PCollectionView<?>, RunnerApi.SdkFunctionSpec> pCollectionViewsToWindowMappingFns,
        NameContext nameContext) {
      return new AutoValue_Nodes_FetchAndFilterStreamingSideInputsNode(
          windowingStrategy, pCollectionViewsToWindowMappingFns, nameContext);
    }

    public abstract WindowingStrategy<?, ?> getWindowingStrategy();

    public abstract Map<PCollectionView<?>, RunnerApi.SdkFunctionSpec>
        getPCollectionViewsToWindowMappingFns();

    public abstract NameContext getNameContext();
  }

  // Hide visibility to prevent instantiation
  private Nodes() {}
}
