/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.fnexecution.wire;

import java.util.Map.Entry;
import java.util.function.Predicate;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;

/** Utilities for replacing or wrapping unknown coders with {@link LengthPrefixCoder}. */
public class LengthPrefixUnknownCoders {
  /**
   * Recursively traverses the coder tree and wraps the first unknown coder in every branch with a
   * {@link LengthPrefixCoder} unless an ancestor coder is itself a {@link LengthPrefixCoder}. If
   * {@code replaceWithByteArrayCoder} is set, then replaces that unknown coder with a {@link
   * ByteArrayCoder}. Registers the new coders in the given {@link Components.Builder}. Note that no
   * ids that are generated will collide with the ids supplied within the {@link
   * Components#getCodersMap() coder map} key space.
   *
   * @param coderId The root coder contained within {@code coders} to start the recursive descent
   *     from.
   * @param components Components builder that initially contains the root coder and all component
   *     coders, and will be modified to contain all the necessary additional coders (including the
   *     resulting coder).
   * @param replaceWithByteArrayCoder whether to replace an unknown coder with a {@link
   *     ByteArrayCoder}.
   * @return Id of the original coder (if unchanged) or the newly generated length-prefixed coder.
   */
  public static String addLengthPrefixedCoder(
      String coderId, RunnerApi.Components.Builder components, boolean replaceWithByteArrayCoder) {
    String lengthPrefixedByteArrayCoderId = addLengthPrefixByteArrayCoder(components);
    String urn = components.getCodersOrThrow(coderId).getSpec().getSpec().getUrn();

    // We handle three cases:
    //  1) the requested coder is already a length prefix coder. In this case we just honor the
    //     request to replace the coder with a byte array coder.
    //  2) the requested coder is a known coder but not a length prefix coder. In this case we
    //     rebuild the coder by recursively length prefixing any unknown component coders.
    //  3) the requested coder is an unknown coder. In this case we either wrap the requested coder
    //     with a length prefix coder or replace it with a length prefix byte array coder.
    if (ModelCoders.LENGTH_PREFIX_CODER_URN.equals(urn)) {
      return replaceWithByteArrayCoder ? lengthPrefixedByteArrayCoderId : coderId;
    } else if (ModelCoders.urns().contains(urn)) {
      return addForModelCoder(coderId, components, replaceWithByteArrayCoder);
    } else {
      return replaceWithByteArrayCoder
          ? lengthPrefixedByteArrayCoderId
          : addWrappedWithLengthPrefixCoder(coderId, components);
    }
  }

  private static String addForModelCoder(
      String coderId, RunnerApi.Components.Builder components, boolean replaceWithByteArrayCoder) {
    Coder coder = components.getCodersOrThrow(coderId);
    RunnerApi.Coder.Builder builder = coder.toBuilder().clearComponentCoderIds();
    for (String componentCoderId : coder.getComponentCoderIdsList()) {
      builder.addComponentCoderIds(
          addLengthPrefixedCoder(componentCoderId, components, replaceWithByteArrayCoder));
    }
    return addCoder(builder.build(), components, coderId + "-length_prefix");
  }

  // If we are handling an unknown URN then we need to wrap it with a length prefix coder.
  // If requested we also replace the unknown coder with a byte array coder.
  private static String addWrappedWithLengthPrefixCoder(
      String coderId, RunnerApi.Components.Builder components) {
    Coder.Builder lengthPrefixed = Coder.newBuilder().addComponentCoderIds(coderId);
    lengthPrefixed
        .getSpecBuilder()
        .getSpecBuilder()
        .setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN)
        .build();
    return addCoder(lengthPrefixed.build(), components, coderId + "-length_prefix");
  }

  /** Adds the (singleton) length-prefixed byte array coder. */
  private static String addLengthPrefixByteArrayCoder(RunnerApi.Components.Builder components) {
    // Add byte array coder
    Coder.Builder byteArrayCoder = Coder.newBuilder();
    byteArrayCoder.getSpecBuilder().getSpecBuilder().setUrn(ModelCoders.BYTES_CODER_URN);
    String byteArrayCoderId = addCoder(byteArrayCoder.build(), components, "byte_array");

    // Wrap it into length-prefixed coder
    Coder.Builder lengthPrefixByteArrayCoder = Coder.newBuilder();
    lengthPrefixByteArrayCoder
        .addComponentCoderIds(byteArrayCoderId)
        .getSpecBuilder()
        .getSpecBuilder()
        .setUrn(ModelCoders.LENGTH_PREFIX_CODER_URN);

    return addCoder(lengthPrefixByteArrayCoder.build(), components, "length_prefix_byte_array");
  }

  private static String addCoder(
      RunnerApi.Coder coder, RunnerApi.Components.Builder components, String uniqueIdPrefix) {
    for (Entry<String, Coder> entry : components.getCodersMap().entrySet()) {
      if (entry.getValue().equals(coder)) {
        return entry.getKey();
      }
    }
    String id = generateUniqueId(uniqueIdPrefix, components::containsCoders);
    components.putCoders(id, coder);
    return id;
  }

  /** Generates a unique id given a prefix and the set of existing ids. */
  static String generateUniqueId(String prefix, Predicate<String> isExistingId) {
    int i = 0;
    while (isExistingId.test(prefix + i)) {
      i += 1;
    }
    return prefix + i;
  }
}
