/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.transforms.reflect;

import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.OnTimerMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.FinishBundleContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OnTimerContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.OutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.PaneInfoParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.ProcessContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SchemaElementParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.SideInputParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StartBundleContextParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TaggedOutputReceiverParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomainParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.field.FieldDescription;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.method.MethodDescription;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.modifier.Visibility;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.description.type.TypeList;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.DynamicType;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.scaffold.InstrumentedType;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.ExceptionMethod;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.FixedValue;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.Implementation;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.Implementation.Context;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.MethodDelegation;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.Throw;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.Assigner.Typing;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.constant.TextConstant;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.FieldAccess;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodReturn;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Label;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.MethodVisitor;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Opcodes;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.jar.asm.Type;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.matcher.ElementMatchers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Primitives;

/** Dynamically generates a {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {

  public static final String START_BUNDLE_CONTEXT_PARAMETER_METHOD = "startBundleContext";
  public static final String FINISH_BUNDLE_CONTEXT_PARAMETER_METHOD = "finishBundleContext";
  public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
  public static final String ELEMENT_PARAMETER_METHOD = "element";
  public static final String SCHEMA_ELEMENT_PARAMETER_METHOD = "schemaElement";
  public static final String TIMESTAMP_PARAMETER_METHOD = "timestamp";
  public static final String OUTPUT_ROW_RECEIVER_METHOD = "outputRowReceiver";
  public static final String TIME_DOMAIN_PARAMETER_METHOD = "timeDomain";
  public static final String OUTPUT_PARAMETER_METHOD = "outputReceiver";
  public static final String TAGGED_OUTPUT_PARAMETER_METHOD = "taggedOutputReceiver";
  public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
  public static final String WINDOW_PARAMETER_METHOD = "window";
  public static final String PANE_INFO_PARAMETER_METHOD = "paneInfo";
  public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
  public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
  public static final String STATE_PARAMETER_METHOD = "state";
  public static final String TIMER_PARAMETER_METHOD = "timer";
  public static final String SIDE_INPUT_PARAMETER_METHOD = "sideInput";

  /**
   * Returns a {@link ByteBuddyDoFnInvokerFactory} shared with all other invocations, so that its
   * cache of generated classes is global.
   */
  public static ByteBuddyDoFnInvokerFactory only() {
    return INSTANCE;
  }

  /**
   * Creates a {@link DoFnInvoker} for the given {@link DoFn} by generating bytecode that directly
   * invokes its methods with arguments extracted from the {@link DoFnInvoker.ArgumentProvider}.
   */
  @Override
  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(DoFn<InputT, OutputT> fn) {
    return newByteBuddyInvoker(fn);
  }

  private static final ByteBuddyDoFnInvokerFactory INSTANCE = new ByteBuddyDoFnInvokerFactory();

  private static final String FN_DELEGATE_FIELD_NAME = "delegate";

  /**
   * A cache of constructors of generated {@link DoFnInvoker} classes, keyed by {@link DoFn} class.
   * Needed because generating an invoker class is expensive, and to avoid generating an excessive
   * number of classes consuming PermGen memory.
   */
  private final Map<Class<?>, Constructor<?>> byteBuddyInvokerConstructorCache =
      new LinkedHashMap<>();

  private ByteBuddyDoFnInvokerFactory() {}

  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
  @SuppressWarnings({"unchecked", "rawtypes"})
  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
      DoFn<InputT, OutputT> fn) {
    return newByteBuddyInvoker(DoFnSignatures.getSignature((Class) fn.getClass()), fn);
  }

  /**
   * Internal base class for generated {@link DoFnInvoker} instances.
   *
   * <p>This class should <i>not</i> be extended directly, or by Beam users. It must be public for
   * generated instances to have adequate access, as they are generated "inside" the invoked {@link
   * DoFn} class.
   */
  public abstract static class DoFnInvokerBase<InputT, OutputT, DoFnT extends DoFn<InputT, OutputT>>
      implements DoFnInvoker<InputT, OutputT> {
    protected DoFnT delegate;

    private Map<String, OnTimerInvoker> onTimerInvokers = new HashMap<>();

    public DoFnInvokerBase(DoFnT delegate) {
      this.delegate = delegate;
    }

    /**
     * Associates the given timer ID with the given {@link OnTimerInvoker}.
     *
     * <p>ByteBuddy does not like to generate conditional code, so we use a map + lookup of the
     * timer ID rather than a generated conditional branch to choose which OnTimerInvoker to invoke.
     *
     * <p>This method has package level access as it is intended only for assembly of the {@link
     * DoFnInvokerBase} not by any subclass.
     */
    void addOnTimerInvoker(String timerId, OnTimerInvoker onTimerInvoker) {
      this.onTimerInvokers.put(timerId, onTimerInvoker);
    }

    @Override
    public void invokeOnTimer(
        String timerId, DoFnInvoker.ArgumentProvider<InputT, OutputT> arguments) {
      @Nullable OnTimerInvoker onTimerInvoker = onTimerInvokers.get(timerId);

      if (onTimerInvoker != null) {
        onTimerInvoker.invokeOnTimer(arguments);
      } else {
        throw new IllegalArgumentException(
            String.format(
                "Attempted to invoke timer %s on %s, but that timer is not registered."
                    + " This is the responsibility of the runner, which must only deliver"
                    + " registered timers.",
                timerId, delegate.getClass().getName()));
      }
    }

    @Override
    public DoFn<InputT, OutputT> getFn() {
      return delegate;
    }
  }

  /** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
  public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
      DoFnSignature signature, DoFn<InputT, OutputT> fn) {
    checkArgument(
        signature.fnClass().equals(fn.getClass()),
        "Signature is for class %s, but fn is of class %s",
        signature.fnClass(),
        fn.getClass());

    try {
      @SuppressWarnings("unchecked")
      DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>> invoker =
          (DoFnInvokerBase<InputT, OutputT, DoFn<InputT, OutputT>>)
              getByteBuddyInvokerConstructor(signature).newInstance(fn);

      for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) {
        invoker.addOnTimerInvoker(
            onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id()));
      }

      return invoker;
    } catch (InstantiationException
        | IllegalAccessException
        | IllegalArgumentException
        | InvocationTargetException
        | SecurityException e) {
      throw new RuntimeException("Unable to bind invoker for " + fn.getClass(), e);
    }
  }

  /**
   * Returns a generated constructor for a {@link DoFnInvoker} for the given {@link DoFn} class.
   *
   * <p>These are cached such that at most one {@link DoFnInvoker} class exists for a given {@link
   * DoFn} class.
   */
  private synchronized Constructor<?> getByteBuddyInvokerConstructor(DoFnSignature signature) {
    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
    Constructor<?> constructor = byteBuddyInvokerConstructorCache.get(fnClass);
    if (constructor == null) {
      Class<? extends DoFnInvoker<?, ?>> invokerClass = generateInvokerClass(signature);
      try {
        constructor = invokerClass.getConstructor(fnClass);
      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
        throw new RuntimeException(e);
      }
      byteBuddyInvokerConstructorCache.put(fnClass, constructor);
    }
    return constructor;
  }

  /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
  public static class DefaultSplitRestriction {
    /** Doesn't split the restriction. */
    @SuppressWarnings("unused")
    public static <InputT, RestrictionT> void invokeSplitRestriction(
        InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
      receiver.output(restriction);
    }
  }

  /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
  public static class DefaultRestrictionCoder {
    private final TypeDescriptor<?> restrictionType;

    DefaultRestrictionCoder(TypeDescriptor<?> restrictionType) {
      this.restrictionType = restrictionType;
    }

    /** Doesn't split the restriction. */
    @SuppressWarnings({"unused", "unchecked"})
    public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
        throws CannotProvideCoderException {
      return (Coder) registry.getCoder(restrictionType);
    }
  }

  /** Default implementation of {@link DoFn.NewTracker}, for delegation by bytebuddy. */
  public static class DefaultNewTracker {
    /** Uses {@link HasDefaultTracker} to produce the tracker. */
    @SuppressWarnings("unused")
    public static RestrictionTracker invokeNewTracker(Object restriction) {
      return ((HasDefaultTracker) restriction).newTracker();
    }
  }

  /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
  private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
    Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();

    final TypeDescription clazzDescription = new TypeDescription.ForLoadedType(fnClass);

    DynamicType.Builder<?> builder =
        new ByteBuddy()
            // Create subclasses inside the target class, to have access to
            // private and package-private bits
            .with(
                StableInvokerNamingStrategy.forDoFnClass(fnClass)
                    .withSuffix(DoFnInvoker.class.getSimpleName()))

            // class <invoker class> extends DoFnInvokerBase {
            .subclass(DoFnInvokerBase.class, ConstructorStrategy.Default.NO_CONSTRUCTORS)

            //   public <invoker class>(<fn class> delegate) { this.delegate = delegate; }
            .defineConstructor(Visibility.PUBLIC)
            .withParameter(fnClass)
            .intercept(new InvokerConstructor())

            //   public invokeProcessElement(ProcessContext, ExtraContextFactory) {
            //     delegate.<@ProcessElement>(... pass just the right args ...);
            //   }
            .method(ElementMatchers.named("invokeProcessElement"))
            .intercept(new ProcessElementDelegation(clazzDescription, signature.processElement()))

            //   public invokeStartBundle(Context c) { delegate.<@StartBundle>(c); }
            //   ... etc ...
            .method(ElementMatchers.named("invokeStartBundle"))
            .intercept(delegateOrNoop(clazzDescription, signature.startBundle()))
            .method(ElementMatchers.named("invokeFinishBundle"))
            .intercept(delegateOrNoop(clazzDescription, signature.finishBundle()))
            .method(ElementMatchers.named("invokeSetup"))
            .intercept(delegateOrNoop(clazzDescription, signature.setup()))
            .method(ElementMatchers.named("invokeTeardown"))
            .intercept(delegateOrNoop(clazzDescription, signature.teardown()))
            .method(ElementMatchers.named("invokeOnWindowExpiration"))
            .intercept(
                delegateMethodWithExtraParametersOrNoop(
                    clazzDescription, signature.onWindowExpiration()))
            .method(ElementMatchers.named("invokeGetInitialRestriction"))
            .intercept(
                delegateWithDowncastOrThrow(clazzDescription, signature.getInitialRestriction()))
            .method(ElementMatchers.named("invokeSplitRestriction"))
            .intercept(splitRestrictionDelegation(clazzDescription, signature))
            .method(ElementMatchers.named("invokeGetRestrictionCoder"))
            .intercept(getRestrictionCoderDelegation(clazzDescription, signature))
            .method(ElementMatchers.named("invokeNewTracker"))
            .intercept(newTrackerDelegation(clazzDescription, signature.newTracker()));

    DynamicType.Unloaded<?> unloaded = builder.make();

    @SuppressWarnings("unchecked")
    Class<? extends DoFnInvoker<?, ?>> res =
        (Class<? extends DoFnInvoker<?, ?>>)
            unloaded
                .load(
                    findClassLoader(fnClass.getClassLoader()),
                    ClassLoadingStrategy.Default.INJECTION)
                .getLoaded();
    return res;
  }

  private static Implementation getRestrictionCoderDelegation(
      TypeDescription doFnType, DoFnSignature signature) {
    if (signature.processElement().isSplittable()) {
      if (signature.getRestrictionCoder() == null) {
        return MethodDelegation.to(
            new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
      } else {
        return new DowncastingParametersMethodDelegation(
            doFnType, signature.getRestrictionCoder().targetMethod());
      }
    } else {
      return ExceptionMethod.throwing(UnsupportedOperationException.class);
    }
  }

  private static Implementation splitRestrictionDelegation(
      TypeDescription doFnType, DoFnSignature signature) {
    if (signature.splitRestriction() == null) {
      return MethodDelegation.to(DefaultSplitRestriction.class);
    } else {
      return new DowncastingParametersMethodDelegation(
          doFnType, signature.splitRestriction().targetMethod());
    }
  }

  private static Implementation newTrackerDelegation(
      TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod signature) {
    if (signature == null) {
      // We must have already verified that in this case the restriction type
      // is a subtype of HasDefaultTracker.
      return MethodDelegation.to(DefaultNewTracker.class);
    } else {
      return delegateWithDowncastOrThrow(doFnType, signature);
    }
  }

  /** Delegates to the given method if available, or does nothing. */
  private static Implementation delegateOrNoop(
      TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
    return (method == null)
        ? FixedValue.originType()
        : new DoFnMethodDelegation(doFnType, method.targetMethod());
  }

  /** Delegates method with extra parameters to the given method if available, or does nothing. */
  private static Implementation delegateMethodWithExtraParametersOrNoop(
      TypeDescription doFnType, DoFnSignature.MethodWithExtraParameters method) {
    return (method == null)
        ? FixedValue.originType()
        : new DoFnMethodWithExtraParametersDelegation(doFnType, method);
  }

  /** Delegates to the given method if available, or throws UnsupportedOperationException. */
  private static Implementation delegateWithDowncastOrThrow(
      TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
    return (method == null)
        ? ExceptionMethod.throwing(UnsupportedOperationException.class)
        : new DowncastingParametersMethodDelegation(doFnType, method.targetMethod());
  }

  /**
   * Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
   * "target method" of the wrapped {@link DoFn}.
   */
  static class DoFnMethodDelegation implements Implementation {
    /** The {@link MethodDescription} of the wrapped {@link DoFn}'s method. */
    protected final MethodDescription targetMethod;
    /** Whether the target method returns non-void. */
    private final boolean targetHasReturn;

    /** Starts {@code null}, initialized by {@link #prepare(InstrumentedType)}. */
    @Nullable protected FieldDescription delegateField;

    private final TypeDescription doFnType;

    public DoFnMethodDelegation(TypeDescription doFnType, Method targetMethod) {
      this.doFnType = doFnType;
      this.targetMethod = new MethodDescription.ForLoadedMethod(targetMethod);
      this.targetHasReturn =
          !TypeDescription.VOID.equals(this.targetMethod.getReturnType().asErasure());
    }

    @Override
    public InstrumentedType prepare(InstrumentedType instrumentedType) {
      // Remember the field description of the instrumented type.
      delegateField =
          instrumentedType
              .getSuperClass() // always DoFnInvokerBase
              .getDeclaredFields()
              .filter(ElementMatchers.named(FN_DELEGATE_FIELD_NAME))
              .getOnly();
      // Delegating the method call doesn't require any changes to the instrumented type.
      return instrumentedType;
    }

    @Override
    public ByteCodeAppender appender(final Target implementationTarget) {
      return new ByteCodeAppender() {
        /**
         * @param instrumentedMethod The {@link DoFnInvoker} method for which we're generating code.
         */
        @Override
        public Size apply(
            MethodVisitor methodVisitor,
            Context implementationContext,
            MethodDescription instrumentedMethod) {
          // Figure out how many locals we'll need. This corresponds to "this", the parameters
          // of the instrumented method, and an argument to hold the return value if the target
          // method has a return value.
          int numLocals = 1 + instrumentedMethod.getParameters().size() + (targetHasReturn ? 1 : 0);

          Integer returnVarIndex = null;
          if (targetHasReturn) {
            // Local comes after formal parameters, so figure out where that is.
            returnVarIndex = 1; // "this"
            for (Type param : Type.getArgumentTypes(instrumentedMethod.getDescriptor())) {
              returnVarIndex += param.getSize();
            }
          }

          StackManipulation manipulation =
              new StackManipulation.Compound(
                  // Push "this" (DoFnInvoker on top of the stack)
                  MethodVariableAccess.REFERENCE.loadFrom(0),
                  // Access this.delegate (DoFn on top of the stack)
                  FieldAccess.forField(delegateField).read(),
                  // Cast it to the more precise type
                  TypeCasting.to(doFnType),
                  // Run the beforeDelegation manipulations.
                  // The arguments necessary to invoke the target are on top of the stack.
                  beforeDelegation(instrumentedMethod),
                  // Perform the method delegation.
                  // This will consume the arguments on top of the stack
                  // Either the stack is now empty (because the targetMethod returns void) or the
                  // stack contains the return value.
                  new UserCodeMethodInvocation(returnVarIndex, targetMethod, instrumentedMethod),
                  // Run the afterDelegation manipulations.
                  // Either the stack is now empty (because the instrumentedMethod returns void)
                  // or the stack contains the return value.
                  afterDelegation(instrumentedMethod));

          StackManipulation.Size size = manipulation.apply(methodVisitor, implementationContext);
          return new Size(size.getMaximalSize(), numLocals);
        }
      };
    }

    /**
     * Return the code to the prepare the operand stack for the method delegation.
     *
     * <p>Before this method is called, the stack delegate will be the only thing on the stack.
     *
     * <p>After this method is called, the stack contents should contain exactly the arguments
     * necessary to invoke the target method.
     */
    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
      return MethodVariableAccess.allArgumentsOf(targetMethod);
    }

    /**
     * Return the code to execute after the method delegation.
     *
     * <p>Before this method is called, the stack will either be empty (if the target method returns
     * void) or contain the method return value.
     *
     * <p>After this method is called, the stack should either be empty (if the instrumented method
     * returns void) or contain the value for the instrumented method to return).
     */
    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
      return new StackManipulation.Compound(
          Assigner.DEFAULT.assign(
              targetMethod.getReturnType(), instrumentedMethod.getReturnType(), Typing.STATIC),
          MethodReturn.of(instrumentedMethod.getReturnType()));
    }
  }

  /**
   * Implements {@link DoFnInvoker}'s method with extra parameters by delegating to a "target
   * method" of the wrapped {@link DoFn}.
   */
  static class DoFnMethodWithExtraParametersDelegation extends DoFnMethodDelegation {
    private final DoFnSignature.MethodWithExtraParameters signature;

    public DoFnMethodWithExtraParametersDelegation(
        TypeDescription clazzDescription, DoFnSignature.MethodWithExtraParameters signature) {
      super(clazzDescription, signature.targetMethod());
      this.signature = signature;
    }

    @Override
    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
      // Parameters of the wrapper invoker method:
      //   DoFn.ArgumentProvider
      // Parameters of the wrapped DoFn method:
      //   a dynamic set of allowed "extra" parameters in any order subject to
      //   validation prior to getting the DoFnSignature
      ArrayList<StackManipulation> parameters = new ArrayList<>();

      // To load the delegate, push `this` and then access the field
      StackManipulation pushDelegate =
          new StackManipulation.Compound(
              MethodVariableAccess.REFERENCE.loadFrom(0),
              FieldAccess.forField(delegateField).read());

      StackManipulation pushExtraContextFactory = MethodVariableAccess.REFERENCE.loadFrom(1);

      // Push the extra arguments in their actual order.
      for (DoFnSignature.Parameter param : signature.extraParameters()) {
        parameters.add(
            new StackManipulation.Compound(
                pushExtraContextFactory,
                ByteBuddyDoFnInvokerFactory.getExtraContextParameter(param, pushDelegate)));
      }
      return new StackManipulation.Compound(parameters);
    }
  }

  /**
   * Passes parameters to the delegated method by downcasting each parameter of non-primitive type
   * to its expected type.
   */
  private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation {
    DowncastingParametersMethodDelegation(TypeDescription doFnType, Method method) {
      super(doFnType, method);
    }

    @Override
    protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
      List<StackManipulation> pushParameters = new ArrayList<>();
      TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList();
      for (int i = 0; i < paramTypes.size(); i++) {
        TypeDescription.Generic paramT = paramTypes.get(i);
        pushParameters.add(MethodVariableAccess.of(paramT).loadFrom(i + 1));
        if (!paramT.isPrimitive()) {
          pushParameters.add(TypeCasting.to(paramT));
        }
      }
      return new StackManipulation.Compound(pushParameters);
    }
  }

  /**
   * This wrapper exists to convert checked exceptions to unchecked exceptions, since if this fails
   * the library itself is malformed.
   */
  private static MethodDescription getExtraContextFactoryMethodDescription(
      String methodName, Class<?>... parameterTypes) {
    try {
      return new MethodDescription.ForLoadedMethod(
          DoFnInvoker.ArgumentProvider.class.getMethod(methodName, parameterTypes));
    } catch (Exception e) {
      throw new IllegalStateException(
          String.format(
              "Failed to locate required method %s.%s",
              DoFnInvoker.ArgumentProvider.class.getSimpleName(), methodName),
          e);
    }
  }

  /**
   * Calls a zero-parameter getter on the {@link DoFnInvoker.ArgumentProvider}, which must be on top
   * of the stack.
   */
  private static StackManipulation simpleExtraContextParameter(String methodName) {
    return new StackManipulation.Compound(
        MethodInvocation.invoke(getExtraContextFactoryMethodDescription(methodName)));
  }

  static StackManipulation getExtraContextParameter(
      DoFnSignature.Parameter parameter, final StackManipulation pushDelegate) {

    return parameter.match(
        new Cases<StackManipulation>() {

          @Override
          public StackManipulation dispatch(StartBundleContextParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        START_BUNDLE_CONTEXT_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(FinishBundleContextParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        FINISH_BUNDLE_CONTEXT_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(ProcessContextParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        PROCESS_CONTEXT_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(ElementParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(ELEMENT_PARAMETER_METHOD, DoFn.class)),
                TypeCasting.to(new TypeDescription.ForLoadedType(p.elementT().getRawType())));
          }

          @Override
          public StackManipulation dispatch(SchemaElementParameter p) {
            ForLoadedType elementType = new ForLoadedType(p.elementT().getRawType());
            ForLoadedType castType =
                elementType.isPrimitive()
                    ? new ForLoadedType(Primitives.wrap(p.elementT().getRawType()))
                    : elementType;

            StackManipulation stackManipulation =
                new StackManipulation.Compound(
                    IntegerConstant.forValue(p.index()),
                    MethodInvocation.invoke(
                        getExtraContextFactoryMethodDescription(
                            SCHEMA_ELEMENT_PARAMETER_METHOD, int.class)),
                    TypeCasting.to(castType));
            if (elementType.isPrimitive()) {
              stackManipulation =
                  new Compound(
                      stackManipulation,
                      Assigner.DEFAULT.assign(
                          elementType.asBoxed().asGenericType(),
                          elementType.asUnboxed().asGenericType(),
                          Typing.STATIC));
            }
            return stackManipulation;
          }

          @Override
          public StackManipulation dispatch(TimestampParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        TIMESTAMP_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(TimeDomainParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        TIME_DOMAIN_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(OutputReceiverParameter p) {
            String method =
                p.isRowReceiver() ? OUTPUT_ROW_RECEIVER_METHOD : OUTPUT_PARAMETER_METHOD;
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(method, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(TaggedOutputReceiverParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        TAGGED_OUTPUT_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(OnTimerContextParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        ON_TIMER_CONTEXT_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(WindowParameter p) {
            return new StackManipulation.Compound(
                simpleExtraContextParameter(WINDOW_PARAMETER_METHOD),
                TypeCasting.to(new TypeDescription.ForLoadedType(p.windowT().getRawType())));
          }

          @Override
          public StackManipulation dispatch(PaneInfoParameter p) {
            return new StackManipulation.Compound(
                pushDelegate,
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        PANE_INFO_PARAMETER_METHOD, DoFn.class)));
          }

          @Override
          public StackManipulation dispatch(RestrictionTrackerParameter p) {
            // DoFnInvoker.ArgumentProvider.restrictionTracker() returns a RestrictionTracker,
            // but the @ProcessElement method expects a concrete subtype of it.
            // Insert a downcast.
            return new StackManipulation.Compound(
                simpleExtraContextParameter(RESTRICTION_TRACKER_PARAMETER_METHOD),
                TypeCasting.to(new TypeDescription.ForLoadedType(p.trackerT().getRawType())));
          }

          @Override
          public StackManipulation dispatch(StateParameter p) {
            return new StackManipulation.Compound(
                new TextConstant(p.referent().id()),
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(STATE_PARAMETER_METHOD, String.class)),
                TypeCasting.to(
                    new TypeDescription.ForLoadedType(p.referent().stateType().getRawType())));
          }

          @Override
          public StackManipulation dispatch(TimerParameter p) {
            return new StackManipulation.Compound(
                new TextConstant(p.referent().id()),
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)),
                TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
          }

          @Override
          public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) {
            return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD);
          }

          @Override
          public StackManipulation dispatch(SideInputParameter p) {
            return new StackManipulation.Compound(
                new TextConstant(p.sideInputId()),
                MethodInvocation.invoke(
                    getExtraContextFactoryMethodDescription(
                        SIDE_INPUT_PARAMETER_METHOD, String.class)),
                TypeCasting.to(new TypeDescription.ForLoadedType(p.elementT().getRawType())));
          }
        });
  }

  /**
   * Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
   * {@link ProcessElement} method.
   */
  private static final class ProcessElementDelegation
      extends DoFnMethodWithExtraParametersDelegation {
    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;

    static {
      try {
        PROCESS_CONTINUATION_STOP_METHOD =
            new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
      } catch (NoSuchMethodException e) {
        throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
      }
    }

    private final DoFnSignature.ProcessElementMethod signature;

    /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
    private ProcessElementDelegation(
        TypeDescription doFnType, DoFnSignature.ProcessElementMethod signature) {
      super(doFnType, signature);
      this.signature = signature;
    }

    @Override
    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
      if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
        return new StackManipulation.Compound(
            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
      } else {
        return MethodReturn.of(targetMethod.getReturnType().asErasure());
      }
    }
  }

  private static class UserCodeMethodInvocation implements StackManipulation {

    @Nullable private final Integer returnVarIndex;
    private final MethodDescription targetMethod;
    private final MethodDescription instrumentedMethod;
    private final TypeDescription returnType;

    private final Label wrapStart = new Label();
    private final Label wrapEnd = new Label();
    private final Label tryBlockStart = new Label();
    private final Label tryBlockEnd = new Label();
    private final Label catchBlockStart = new Label();
    private final Label catchBlockEnd = new Label();

    private final MethodDescription createUserCodeException;

    UserCodeMethodInvocation(
        @Nullable Integer returnVarIndex,
        MethodDescription targetMethod,
        MethodDescription instrumentedMethod) {
      this.returnVarIndex = returnVarIndex;
      this.targetMethod = targetMethod;
      this.instrumentedMethod = instrumentedMethod;
      this.returnType = targetMethod.getReturnType().asErasure();

      boolean targetMethodReturnsVoid = TypeDescription.VOID.equals(returnType);
      checkArgument(
          (returnVarIndex == null) == targetMethodReturnsVoid,
          "returnVarIndex should be defined if and only if the target method has a return value");

      try {
        createUserCodeException =
            new MethodDescription.ForLoadedMethod(
                UserCodeException.class.getDeclaredMethod("wrap", Throwable.class));
      } catch (NoSuchMethodException | SecurityException e) {
        throw new RuntimeException("Unable to find UserCodeException.wrap", e);
      }
    }

    @Override
    public boolean isValid() {
      return true;
    }

    private Object describeType(Type type) {
      switch (type.getSort()) {
        case Type.OBJECT:
          return type.getInternalName();
        case Type.INT:
        case Type.BYTE:
        case Type.BOOLEAN:
        case Type.SHORT:
          return Opcodes.INTEGER;
        case Type.LONG:
          return Opcodes.LONG;
        case Type.DOUBLE:
          return Opcodes.DOUBLE;
        case Type.FLOAT:
          return Opcodes.FLOAT;
        default:
          throw new IllegalArgumentException("Unhandled type as method argument: " + type);
      }
    }

    private void visitFrame(
        MethodVisitor mv, boolean localsIncludeReturn, @Nullable String stackTop) {
      boolean hasReturnLocal = (returnVarIndex != null) && localsIncludeReturn;

      Type[] localTypes = Type.getArgumentTypes(instrumentedMethod.getDescriptor());
      Object[] locals = new Object[1 + localTypes.length + (hasReturnLocal ? 1 : 0)];
      locals[0] = instrumentedMethod.getReceiverType().asErasure().getInternalName();
      for (int i = 0; i < localTypes.length; i++) {
        locals[i + 1] = describeType(localTypes[i]);
      }
      if (hasReturnLocal) {
        locals[locals.length - 1] = returnType.getInternalName();
      }

      Object[] stack = stackTop == null ? new Object[] {} : new Object[] {stackTop};

      mv.visitFrame(Opcodes.F_NEW, locals.length, locals, stack.length, stack);
    }

    @Override
    public Size apply(MethodVisitor mv, Context context) {
      Size size = new Size(0, 0);

      mv.visitLabel(wrapStart);

      String throwableName = new TypeDescription.ForLoadedType(Throwable.class).getInternalName();
      mv.visitTryCatchBlock(tryBlockStart, tryBlockEnd, catchBlockStart, throwableName);

      // The try block attempts to perform the expected operations, then jumps to success
      mv.visitLabel(tryBlockStart);
      size = size.aggregate(MethodInvocation.invoke(targetMethod).apply(mv, context));

      if (returnVarIndex != null) {
        mv.visitVarInsn(Opcodes.ASTORE, returnVarIndex);
        size = size.aggregate(new Size(-1, 0)); // Reduces the size of the stack
      }
      mv.visitJumpInsn(Opcodes.GOTO, catchBlockEnd);
      mv.visitLabel(tryBlockEnd);

      // The handler wraps the exception, and then throws.
      mv.visitLabel(catchBlockStart);
      // In catch block, should have same locals and {Throwable} on the stack.
      visitFrame(mv, false, throwableName);

      // Create the user code exception and throw
      size =
          size.aggregate(
              new Compound(MethodInvocation.invoke(createUserCodeException), Throw.INSTANCE)
                  .apply(mv, context));

      mv.visitLabel(catchBlockEnd);

      // After the catch block we should have the return in scope, but nothing on the stack.
      visitFrame(mv, true, null);

      // After catch block, should have same locals and will have the return on the stack.
      if (returnVarIndex != null) {
        mv.visitVarInsn(Opcodes.ALOAD, returnVarIndex);
        size = size.aggregate(new Size(1, 0)); // Increases the size of the stack
      }
      mv.visitLabel(wrapEnd);
      if (returnVarIndex != null) {
        // Drop the return type from the locals
        mv.visitLocalVariable(
            "res",
            returnType.getDescriptor(),
            returnType.getGenericSignature(),
            wrapStart,
            wrapEnd,
            returnVarIndex);
      }

      return size;
    }
  }

  /**
   * A constructor {@link Implementation} for a {@link DoFnInvoker class}. Produces the byte code
   * for a constructor that takes a single argument and assigns it to the delegate field.
   */
  private static final class InvokerConstructor implements Implementation {
    @Override
    public InstrumentedType prepare(InstrumentedType instrumentedType) {
      return instrumentedType;
    }

    @Override
    public ByteCodeAppender appender(final Target implementationTarget) {
      return (methodVisitor, implementationContext, instrumentedMethod) -> {
        StackManipulation.Size size =
            new StackManipulation.Compound(
                    // Load the this reference
                    MethodVariableAccess.REFERENCE.loadFrom(0),
                    // Load the delegate argument
                    MethodVariableAccess.REFERENCE.loadFrom(1),
                    // Invoke the super constructor (default constructor of Object)
                    MethodInvocation.invoke(
                        new TypeDescription.ForLoadedType(DoFnInvokerBase.class)
                            .getDeclaredMethods()
                            .filter(
                                ElementMatchers.isConstructor()
                                    .and(ElementMatchers.takesArguments(DoFn.class)))
                            .getOnly()),
                    // Return void.
                    MethodReturn.VOID)
                .apply(methodVisitor, implementationContext);
        return new ByteCodeAppender.Size(size.getMaximalSize(), instrumentedMethod.getStackSize());
      };
    }
  }
}
