/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.execute;

import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXStateProxyImpl;
import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
 * Executes Function with FunctionService#onRegion(Region region) in client server mode.
 *
 * @see FunctionService#onRegion(Region) *
 * @since GemFire 5.8 LA
 */
public class ServerRegionFunctionExecutor extends AbstractExecution {
  private static final Logger logger = LogService.getLogger();

  private final LocalRegion region;
  private boolean executeOnBucketSet = false;

  ServerRegionFunctionExecutor(Region r, ProxyCache proxyCache) {
    if (r == null) {
      throw new IllegalArgumentException(
          String.format("The input %s for the execute function request is null",
              "Region"));
    }
    region = (LocalRegion) r;
    this.proxyCache = proxyCache;
  }

  private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
      Object args) {
    super(serverRegionFunctionExecutor);

    region = serverRegionFunctionExecutor.region;
    filter.clear();
    filter.addAll(serverRegionFunctionExecutor.filter);
    this.args = args;
    executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
  }

  private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
      MemberMappedArgument memberMapargs) {
    super(serverRegionFunctionExecutor);

    region = serverRegionFunctionExecutor.region;
    filter.clear();
    filter.addAll(serverRegionFunctionExecutor.filter);
    memberMappedArg = memberMapargs;
    executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
  }


  private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
      ResultCollector rc) {
    super(serverRegionFunctionExecutor);

    region = serverRegionFunctionExecutor.region;
    filter.clear();
    filter.addAll(serverRegionFunctionExecutor.filter);
    this.rc = rc != null ? new SynchronizedResultCollector(rc) : null;
    executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
  }

  private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
      Set filter2) {

    super(serverRegionFunctionExecutor);

    region = serverRegionFunctionExecutor.region;
    filter.clear();
    filter.addAll(filter2);
    executeOnBucketSet = serverRegionFunctionExecutor.executeOnBucketSet;
  }

  private ServerRegionFunctionExecutor(ServerRegionFunctionExecutor serverRegionFunctionExecutor,
      Set<Integer> bucketsAsFilter, boolean executeOnBucketSet) {

    super(serverRegionFunctionExecutor);

    region = serverRegionFunctionExecutor.region;
    filter.clear();
    filter.addAll(bucketsAsFilter);
    this.executeOnBucketSet = executeOnBucketSet;
  }

  @Override
  public Execution withFilter(Set fltr) {
    if (fltr == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "filter"));
    }
    executeOnBucketSet = false;
    return new ServerRegionFunctionExecutor(this, fltr);
  }

  @Override
  public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
    if (bucketIDs == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "buckets as filter"));
    }
    return new ServerRegionFunctionExecutor(this, bucketIDs, true /* execute on bucketset */);
  }

  @Override
  protected ResultCollector executeFunction(final Function function, long timeout, TimeUnit unit) {
    byte hasResult = 0;
    try {
      if (proxyCache != null) {
        if (proxyCache.isClosed()) {
          throw proxyCache.getCacheClosedException("Cache is closed for this user.");
        }
        UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
      }

      if (function.hasResult()) { // have Results
        final int timeoutMs = TimeoutHelper.toMillis(timeout, unit);
        hasResult = 1;
        if (rc == null) { // Default Result Collector
          ResultCollector defaultCollector = new DefaultResultCollector();
          return executeOnServer(function, defaultCollector, hasResult, timeoutMs);
        } else { // Custome Result COllector
          return executeOnServer(function, rc, hasResult, timeoutMs);
        }
      } else { // No results
        executeOnServerNoAck(function, hasResult);
        return new NoResult();
      }
    } finally {
      UserAttributes.userAttributes.set(null);
    }
  }

  protected ResultCollector executeFunction(final String functionId, boolean resultReq,
      boolean isHA, boolean optimizeForWrite, long timeout, TimeUnit unit) {
    try {
      if (proxyCache != null) {
        if (proxyCache.isClosed()) {
          throw proxyCache.getCacheClosedException("Cache is closed for this user.");
        }
        UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
      }
      byte hasResult = 0;
      if (resultReq) { // have Results
        hasResult = 1;
        final int timeoutMs = TimeoutHelper.toMillis(timeout, unit);
        if (rc == null) { // Default Result Collector
          ResultCollector defaultCollector = new DefaultResultCollector();
          return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite,
              timeoutMs);
        } else { // Custome Result COllector
          return executeOnServer(functionId, rc, hasResult, isHA, optimizeForWrite, timeoutMs);
        }
      } else { // No results
        executeOnServerNoAck(functionId, hasResult, isHA, optimizeForWrite);
        return new NoResult();
      }
    } finally {
      UserAttributes.userAttributes.set(null);
    }
  }

  private ResultCollector executeOnServer(Function function, ResultCollector collector,
      byte hasResult, int timeoutMs) throws FunctionException {
    ServerRegionProxy srp = getServerRegionProxy();
    FunctionStats stats =
        FunctionStatsManager.getFunctionStats(function.getId(), region.getSystem());
    long start = stats.startFunctionExecution(true);
    try {
      validateExecution(function, null);
      srp.executeFunction(function, this, collector, hasResult,
          timeoutMs);
      stats.endFunctionExecution(start, true);
      return collector;
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(start, true);
      throw functionException;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(start, true);
      throw new FunctionException(exception);
    }
  }

  private ResultCollector executeOnServer(String functionId, ResultCollector collector,
      byte hasResult, boolean isHA, boolean optimizeForWrite, int timeoutMs)
      throws FunctionException {

    ServerRegionProxy srp = getServerRegionProxy();
    FunctionStats stats = FunctionStatsManager.getFunctionStats(functionId, region.getSystem());
    long start = stats.startFunctionExecution(true);
    try {
      validateExecution(null, null);
      srp.executeFunction(functionId, this, collector, hasResult, isHA,
          optimizeForWrite, timeoutMs);
      stats.endFunctionExecution(start, true);
      return collector;
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(start, true);
      throw functionException;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(start, true);
      throw new FunctionException(exception);
    }
  }


  private void executeOnServerNoAck(Function function, byte hasResult) throws FunctionException {
    ServerRegionProxy srp = getServerRegionProxy();
    FunctionStats stats =
        FunctionStatsManager.getFunctionStats(function.getId(), region.getSystem());
    long start = stats.startFunctionExecution(false);
    try {
      validateExecution(function, null);
      srp.executeFunctionNoAck(region.getFullPath(), function, this, hasResult, false);
      stats.endFunctionExecution(start, false);
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(start, false);
      throw functionException;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(start, false);
      throw new FunctionException(exception);
    }
  }

  private void executeOnServerNoAck(String functionId, byte hasResult, boolean isHA,
      boolean optimizeForWrite) throws FunctionException {
    ServerRegionProxy srp = getServerRegionProxy();
    FunctionStats stats = FunctionStatsManager.getFunctionStats(functionId, region.getSystem());
    long start = stats.startFunctionExecution(false);
    try {
      validateExecution(null, null);
      srp.executeFunctionNoAck(region.getFullPath(), functionId, this, hasResult, isHA,
          optimizeForWrite, false);
      stats.endFunctionExecution(start, false);
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(start, false);
      throw functionException;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(start, false);
      throw new FunctionException(exception);
    }
  }

  private ServerRegionProxy getServerRegionProxy() throws FunctionException {
    ServerRegionProxy srp = region.getServerProxy();
    if (srp != null) {
      if (logger.isDebugEnabled()) {
        logger.debug("Found server region proxy on region. RegionName: {}", region.getName());
      }
      return srp;
    } else {
      String message = srp + ": "
          + "No available connection was found. Server Region Proxy is not available for this region "
          + region.getName();
      throw new FunctionException(message);
    }
  }

  public LocalRegion getRegion() {
    return region;
  }

  @Override
  public String toString() {
    return "[ ServerRegionExecutor:" + "args=" + args + " ;filter=" + filter + " ;region="
        + region.getName() + "]";
  }

  @Override
  public Execution setArguments(Object args) {
    if (args == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "args"));
    }
    return new ServerRegionFunctionExecutor(this, args);
  }

  @Override
  public Execution withArgs(Object args) {
    return setArguments(args);
  }

  @Override
  public Execution withCollector(ResultCollector rs) {
    if (rs == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "Result Collector"));
    }
    return new ServerRegionFunctionExecutor(this, rs);
  }

  @Override
  public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
    if (argument == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "MemberMappedArgument"));
    }
    return new ServerRegionFunctionExecutor(this, argument);
  }

  @Override
  public void validateExecution(Function function, Set targetMembers) {
    InternalCache cache = GemFireCacheImpl.getInstance();
    if (cache != null && cache.getTxManager().getTXState() != null) {
      TXStateProxyImpl tx = (TXStateProxyImpl) cache.getTxManager().getTXState();
      tx.getRealDeal(null, region);
      tx.incOperationCount();
    }
  }

  @Override
  public ResultCollector execute(final String functionName) {
    return execute(functionName, getTimeoutMs(), TimeUnit.MILLISECONDS);
  }

  @Override
  public ResultCollector execute(final String functionName, long timeout, TimeUnit unit) {
    if (functionName == null) {
      throw new FunctionException(
          "The input function for the execute function request is null");
    }
    int timeoutInMs = (int) TimeUnit.MILLISECONDS.convert(timeout, unit);
    isFnSerializationReqd = false;
    Function functionObject = FunctionService.getFunction(functionName);
    if (functionObject == null) {
      byte[] functionAttributes = getFunctionAttributes(functionName);

      if (functionAttributes == null) {
        // GEODE-5618: Set authentication properties before executing the internal function.
        try {
          if (proxyCache != null) {
            if (proxyCache.isClosed()) {
              throw proxyCache.getCacheClosedException("Cache is closed for this user.");
            }
            UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
          }

          ServerRegionProxy srp = getServerRegionProxy();
          Object obj = srp.getFunctionAttributes(functionName);
          functionAttributes = (byte[]) obj;
          addFunctionAttributes(functionName, functionAttributes);
        } finally {
          UserAttributes.userAttributes.set(null);
        }
      }

      boolean isHA = functionAttributes[1] == 1;
      boolean hasResult = functionAttributes[0] == 1;
      boolean optimizeForWrite = functionAttributes[2] == 1;
      return executeFunction(functionName, hasResult, isHA, optimizeForWrite, timeout, unit);
    } else {
      return executeFunction(functionObject, timeout, unit);
    }
  }

  public boolean getExecuteOnBucketSetFlag() {
    return executeOnBucketSet;
  }
}
