/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.runners.dataflow.worker;

import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Factory to create SdkHarnessRegistry */
public class SdkHarnessRegistries {

  /** Create a registry which does not require sdkHarness registration for non fnapi worker. */
  public static SdkHarnessRegistry emptySdkHarnessRegistry() {
    return new EmptySdkHarnessRegistry();
  }

  /** Create a registry for fnapi worker. */
  public static SdkHarnessRegistry createFnApiSdkHarnessRegistry(
      ApiServiceDescriptor stateApiServiceDescriptor,
      GrpcStateService beamFnStateService,
      BeamFnDataGrpcService beamFnDataGrpcService) {
    return new WorkBalancingSdkHarnessRegistry(
        stateApiServiceDescriptor, beamFnStateService, beamFnDataGrpcService);
  }

  /** Registry used to manage all the connections (Control, Data, State) from SdkHarness */
  public static class WorkBalancingSdkHarnessRegistry implements SdkHarnessRegistry {
    private static final Logger LOG =
        LoggerFactory.getLogger(WorkBalancingSdkHarnessRegistry.class);
    private final ApiServiceDescriptor stateApiServiceDescriptor;
    private final GrpcStateService beamFnStateService;
    private final BeamFnDataGrpcService beamFnDataGrpcService;
    private final ConcurrentHashMap<FnApiControlClient, WorkCountingSdkWorkerHarness> workerMap =
        new ConcurrentHashMap<>();

    private final PriorityBlockingQueue<WorkCountingSdkWorkerHarness> workers =
        new PriorityBlockingQueue<>(
            1,
            /* Prioritize the worker with least work */
            Comparator.comparingInt(o -> o.assignedWorkCount.get()));

    /** Create a registry for fnapi worker. */
    private WorkBalancingSdkHarnessRegistry(
        ApiServiceDescriptor stateApiServiceDescriptor,
        GrpcStateService beamFnStateService,
        BeamFnDataGrpcService beamFnDataGrpcService) {
      Preconditions.checkNotNull(beamFnStateService, "StateService can not be null.");
      Preconditions.checkNotNull(beamFnDataGrpcService, "DataService can not be null.");
      this.stateApiServiceDescriptor = stateApiServiceDescriptor;
      this.beamFnStateService = beamFnStateService;
      this.beamFnDataGrpcService = beamFnDataGrpcService;
    }

    private boolean validateAndCleanWorker(WorkCountingSdkWorkerHarness worker) {
      // Recheck and remove worker as it could have been closed by the unregister.
      if (worker.closed.get()) {
        workers.remove(worker);
        return false;
      }
      return true;
    }

    @Override
    public void registerWorkerClient(FnApiControlClient controlClient) {
      Preconditions.checkNotNull(controlClient, "Control client can not be null.");
      WorkCountingSdkWorkerHarness sdkWorkerHarness =
          new WorkCountingSdkWorkerHarness(controlClient);
      workerMap.put(controlClient, sdkWorkerHarness);
      workers.add(sdkWorkerHarness);
      LOG.info("Registered Control client {}", sdkWorkerHarness.getWorkerId());
    }

    @Override
    public void unregisterWorkerClient(FnApiControlClient controlClient) {
      // Find the worker, mark the worker closed and remove it.
      WorkCountingSdkWorkerHarness worker = workerMap.remove(controlClient);

      if (worker != null) {
        worker.closed.set(true);
        workers.remove(worker);
      }
      LOG.info("Unregistered Control client {}", worker != null ? worker.getWorkerId() : null);
    }

    /* Any modification to workers has race condition with unregisterWorkerClient. To resolve this
    we recheck worker state after picking a worker and clean the worker if needed.*/
    @Override
    public WorkCountingSdkWorkerHarness getAvailableWorkerAndAssignWork() {
      // Pick the client with least pending workItems.
      try {
        // Remove and re add the worker after incrementing the assignedWorkCount.
        // If the worker is closed then remove the worker and try to get next available worker.
        WorkCountingSdkWorkerHarness worker;
        do {
          worker = workers.take();
          worker.assignedWorkCount.incrementAndGet();
          // Put back the worker in the queue.
          workers.add(worker);
        } while (!validateAndCleanWorker(worker));
        return worker;
      } catch (InterruptedException e) {
        LOG.error("Interrupted while waiting to get an available worker.");
        return null;
      }
    }

    @Override
    public void completeWork(SdkWorkerHarness worker) {
      // Remove worker -> decrement the workCount -> put the worker back -> Remove the worker if
      // worker is closed.
      if (!(worker instanceof WorkCountingSdkWorkerHarness)) {
        throw new IllegalArgumentException(
            String.format(
                "Worker should be of type %s. Found worker type %s",
                WorkCountingSdkWorkerHarness.class, worker.getClass()));
      }
      WorkCountingSdkWorkerHarness actualWorker = (WorkCountingSdkWorkerHarness) worker;
      if (workers.remove(actualWorker)) {
        actualWorker.assignedWorkCount.decrementAndGet();
        workers.add(actualWorker);
        // Recheck and remove worker as it could have been closed by the unregister.
        validateAndCleanWorker(actualWorker);
      }
    }

    @Override
    @Nullable
    public ApiServiceDescriptor beamFnStateApiServiceDescriptor() {
      return stateApiServiceDescriptor;
    }

    @Override
    @Nullable
    public ApiServiceDescriptor beamFnDataApiServiceDescriptor() {
      return beamFnDataGrpcService.getApiServiceDescriptor();
    }

    /** Class to keep client and associated data */
    public class WorkCountingSdkWorkerHarness implements SdkWorkerHarness {
      private final FnApiControlClient controlClientHandler;
      private final AtomicInteger assignedWorkCount;
      private final AtomicBoolean closed;

      private WorkCountingSdkWorkerHarness(FnApiControlClient controlClientHandler) {
        this.controlClientHandler = controlClientHandler;
        this.assignedWorkCount = new AtomicInteger(0);
        this.closed = new AtomicBoolean(false);
      }

      @Override
      @Nullable
      public FnApiControlClient getControlClientHandler() {
        return controlClientHandler;
      }

      @Override
      @Nullable
      public String getWorkerId() {
        return controlClientHandler.getWorkerId();
      }

      @Override
      @Nullable
      public GrpcFnServer<GrpcDataService> getGrpcDataFnServer() {
        return GrpcFnServer.create(
            beamFnDataGrpcService.getDataService(getWorkerId()), beamFnDataApiServiceDescriptor());
      }

      @Override
      @Nullable
      public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
        return GrpcFnServer.create(beamFnStateService, beamFnDataApiServiceDescriptor());
      }
    }
  }

  /**
   * SdkHarnessRegistry which does not maintain any state and does not support {@link
   * EmptySdkHarnessRegistry#registerWorkerClient(FnApiControlClient)} and {@link
   * EmptySdkHarnessRegistry#unregisterWorkerClient(FnApiControlClient)}.
   *
   * <p>EmptySdkHarnessRegistry should be removed when we untangle the fnapi and non fnapi code from
   * {@link StreamingDataflowWorker}, {@link BatchDataflowWorker} and {@link DataflowRunnerHarness}.
   * The current implementation has to return null at multiple places in order to support the
   * tangled code for fnapi and non fnapi.
   */
  @Deprecated
  public static class EmptySdkHarnessRegistry implements SdkHarnessRegistry {

    private final SdkWorkerHarness sdkWorkerHarness =
        new SdkWorkerHarness() {
          @Nullable
          @Override
          public FnApiControlClient getControlClientHandler() {
            return null;
          }

          @Nullable
          @Override
          public String getWorkerId() {
            return null;
          }

          @Nullable
          @Override
          public GrpcFnServer<GrpcDataService> getGrpcDataFnServer() {
            return null;
          }

          @Nullable
          @Override
          public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
            return null;
          }
        };

    @Override
    public void registerWorkerClient(@Nullable FnApiControlClient controlClient) {
      throw new UnsupportedOperationException(
          "EmptySdkHarnessRegistry does not support this operation");
    }

    @Override
    public void unregisterWorkerClient(FnApiControlClient controlClient) {
      throw new UnsupportedOperationException(
          "EmptySdkHarnessRegistry does not support this operation");
    }

    @Override
    public SdkWorkerHarness getAvailableWorkerAndAssignWork() {
      return sdkWorkerHarness;
    }

    @Override
    public void completeWork(SdkWorkerHarness worker) {}

    @Nullable
    @Override
    public ApiServiceDescriptor beamFnStateApiServiceDescriptor() {
      return null;
    }

    @Nullable
    @Override
    public ApiServiceDescriptor beamFnDataApiServiceDescriptor() {
      return null;
    }
  }
}
