/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.geode.internal.cache.execute;

import java.util.Set;
import java.util.function.Supplier;

import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.internal.ExecuteFunctionNoAckOp;
import org.apache.geode.cache.client.internal.ExecuteFunctionOp;
import org.apache.geode.cache.client.internal.ExecuteFunctionOp.ExecuteFunctionOpImpl;
import org.apache.geode.cache.client.internal.GetFunctionAttributeOp;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.ProxyCache;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector;

public class ServerFunctionExecutor extends AbstractExecution {

  private PoolImpl pool;

  private final boolean allServers;

  private String[] groups;


  ServerFunctionExecutor(Pool pool, boolean allServers, String... groups) {
    this.pool = (PoolImpl) pool;
    this.allServers = allServers;
    this.groups = groups;
  }

  ServerFunctionExecutor(Pool pool, boolean allServers, ProxyCache proxyCache, String... groups) {
    this.pool = (PoolImpl) pool;
    this.allServers = allServers;
    this.proxyCache = proxyCache;
    this.groups = groups;
  }

  private ServerFunctionExecutor(ServerFunctionExecutor sfe) {
    super(sfe);
    if (sfe.pool != null) {
      pool = sfe.pool;
    }
    allServers = sfe.allServers;
    groups = sfe.groups;
  }

  private ServerFunctionExecutor(ServerFunctionExecutor sfe, Object args) {
    this(sfe);
    this.args = args;
  }

  private ServerFunctionExecutor(ServerFunctionExecutor sfe, ResultCollector collector) {
    this(sfe);
    rc = collector != null ? new SynchronizedResultCollector(collector) : null;
  }

  private ServerFunctionExecutor(ServerFunctionExecutor sfe, MemberMappedArgument argument) {
    this(sfe);
    memberMappedArg = argument;
    isMemberMappedArgument = true;
  }

  protected ResultCollector executeFunction(final String functionId, boolean result, boolean isHA,
      boolean optimizeForWrite) {
    try {
      if (proxyCache != null) {
        if (proxyCache.isClosed()) {
          throw proxyCache.getCacheClosedException("Cache is closed for this user.");
        }
        UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
      }

      byte hasResult = 0;
      if (result) {
        hasResult = 1;
        if (rc == null) {
          ResultCollector defaultCollector = new DefaultResultCollector();
          return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite);
        } else {
          return executeOnServer(functionId, rc, hasResult, isHA, optimizeForWrite);
        }
      } else {
        executeOnServerNoAck(functionId, hasResult, isHA, optimizeForWrite);
        return new NoResult();
      }
    } finally {
      UserAttributes.userAttributes.set(null);
    }
  }

  @Override
  protected ResultCollector executeFunction(final Function function) {
    byte hasResult = 0;
    try {
      if (proxyCache != null) {
        if (proxyCache.isClosed()) {
          throw proxyCache.getCacheClosedException("Cache is closed for this user.");
        }
        UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
      }

      if (function.hasResult()) {
        hasResult = 1;
        if (rc == null) {
          ResultCollector defaultCollector = new DefaultResultCollector();
          return executeOnServer(function, defaultCollector, hasResult);
        } else {
          return executeOnServer(function, rc, hasResult);
        }
      } else {
        executeOnServerNoAck(function, hasResult);
        return new NoResult();
      }
    } finally {
      UserAttributes.userAttributes.set(null);
    }

  }

  private ResultCollector executeOnServer(Function function, ResultCollector rc, byte hasResult) {
    FunctionStats stats = FunctionStats.getFunctionStats(function.getId());
    try {
      validateExecution(function, null);
      long start = stats.startTime();
      stats.startFunctionExecution(true);

      final ExecuteFunctionOpImpl executeFunctionOp =
          new ExecuteFunctionOpImpl(function, args, memberMappedArg,
              rc, isFnSerializationReqd, (byte) 0, groups, allServers, isIgnoreDepartedMembers(),
              getTimeoutMs());

      final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier =
          () -> new ExecuteFunctionOpImpl(function, args, memberMappedArg,
              rc, isFnSerializationReqd, (byte) 0,
              null/* onGroups does not use single-hop for now */,
              false, false, getTimeoutMs());

      final Supplier<ExecuteFunctionOpImpl> reExecuteFunctionOpSupplier =
          () -> new ExecuteFunctionOpImpl(function, this.getArguments(),
              this.getMemberMappedArgument(), rc,
              isFnSerializationReqd, (byte) 1, groups, allServers,
              this.isIgnoreDepartedMembers(), getTimeoutMs());

      ExecuteFunctionOp.execute(pool, allServers,
          rc, function.isHA(), UserAttributes.userAttributes.get(), groups,
          executeFunctionOp,
          executeFunctionOpSupplier,
          reExecuteFunctionOpSupplier);

      stats.endFunctionExecution(start, true);
      rc.endResults();
      return rc;
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(true);
      throw functionException;
    } catch (ServerConnectivityException exception) {
      throw exception;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(true);
      throw new FunctionException(exception);
    }
  }

  private ResultCollector executeOnServer(String functionId, ResultCollector rc, byte hasResult,
      boolean isHA, boolean optimizeForWrite) {
    FunctionStats stats = FunctionStats.getFunctionStats(functionId);
    try {
      validateExecution(null, null);
      long start = stats.startTime();
      stats.startFunctionExecution(true);

      final ExecuteFunctionOpImpl executeFunctionOp =
          new ExecuteFunctionOpImpl(functionId, args, memberMappedArg, hasResult,
              rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 0, groups, allServers,
              this.isIgnoreDepartedMembers(), getTimeoutMs());

      final Supplier<ExecuteFunctionOpImpl> executeFunctionOpSupplier =
          () -> new ExecuteFunctionOpImpl(functionId, args, memberMappedArg,
              hasResult,
              rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 0,
              null/* onGroups does not use single-hop for now */, false, false, getTimeoutMs());

      final Supplier<ExecuteFunctionOpImpl> reExecuteFunctionOpSupplier =
          () -> new ExecuteFunctionOpImpl(functionId, args,
              this.getMemberMappedArgument(),
              hasResult, rc, isFnSerializationReqd, isHA, optimizeForWrite, (byte) 1,
              groups, allServers, this.isIgnoreDepartedMembers(), getTimeoutMs());

      ExecuteFunctionOp.execute(pool, allServers,
          rc, isHA,
          UserAttributes.userAttributes.get(), groups,
          executeFunctionOp,
          executeFunctionOpSupplier,
          reExecuteFunctionOpSupplier);

      stats.endFunctionExecution(start, true);
      rc.endResults();
      return rc;
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(true);
      throw functionException;
    } catch (ServerConnectivityException exception) {
      throw exception;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(true);
      throw new FunctionException(exception);
    }
  }

  private void executeOnServerNoAck(Function function, byte hasResult) {
    FunctionStats stats = FunctionStats.getFunctionStats(function.getId());
    try {
      validateExecution(function, null);
      long start = stats.startTime();
      stats.startFunctionExecution(false);
      ExecuteFunctionNoAckOp.execute(pool, function, args, memberMappedArg, allServers,
          hasResult, isFnSerializationReqd, groups);
      stats.endFunctionExecution(start, false);
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(false);
      throw functionException;
    } catch (ServerConnectivityException exception) {
      throw exception;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(false);
      throw new FunctionException(exception);
    }
  }

  private void executeOnServerNoAck(String functionId, byte hasResult, boolean isHA,
      boolean optimizeForWrite) {
    FunctionStats stats = FunctionStats.getFunctionStats(functionId);
    try {
      validateExecution(null, null);
      long start = stats.startTime();
      stats.startFunctionExecution(false);
      ExecuteFunctionNoAckOp.execute(pool, functionId, args, memberMappedArg, allServers,
          hasResult, isFnSerializationReqd, isHA, optimizeForWrite, groups);
      stats.endFunctionExecution(start, false);
    } catch (FunctionException functionException) {
      stats.endFunctionExecutionWithException(false);
      throw functionException;
    } catch (ServerConnectivityException exception) {
      throw exception;
    } catch (Exception exception) {
      stats.endFunctionExecutionWithException(false);
      throw new FunctionException(exception);
    }
  }

  public Pool getPool() {
    return pool;
  }

  @Override
  public Execution withFilter(Set filter) {
    throw new FunctionException(
        String.format("Cannot specify %s for data independent functions",
            "filter"));
  }

  @Override
  public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
    throw new FunctionException(
        String.format("Cannot specify %s for data independent functions",
            "buckets as filter"));
  }

  @Override
  public Execution setArguments(Object args) {
    if (args == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "args"));
    }
    return new ServerFunctionExecutor(this, args);
  }

  @Override
  public Execution withArgs(Object args) {
    return setArguments(args);
  }

  @Override
  public Execution withCollector(ResultCollector rs) {
    if (rs == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "Result Collector"));
    }
    return new ServerFunctionExecutor(this, rs);
  }

  @Override
  public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
    if (argument == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "MemberMapped Args"));
    }
    return new ServerFunctionExecutor(this, argument);
  }

  @Override
  public void validateExecution(Function function, Set targetMembers) {
    if (TXManagerImpl.getCurrentTXUniqueId() != TXManagerImpl.NOTX) {
      throw new UnsupportedOperationException();
    }
  }

  @Override
  public ResultCollector execute(final String functionName) {
    if (functionName == null) {
      throw new FunctionException(
          "The input function for the execute function request is null");
    }
    isFnSerializationReqd = false;
    Function functionObject = FunctionService.getFunction(functionName);
    if (functionObject == null) {
      byte[] functionAttributes = getFunctionAttributes(functionName);

      if (functionAttributes == null) {
        // Set authentication properties before executing the internal function.
        try {
          if (proxyCache != null) {
            if (proxyCache.isClosed()) {
              throw proxyCache.getCacheClosedException("Cache is closed for this user.");
            }
            UserAttributes.userAttributes.set(proxyCache.getUserAttributes());
          }

          Object obj = GetFunctionAttributeOp.execute(pool, functionName);
          functionAttributes = (byte[]) obj;
          addFunctionAttributes(functionName, functionAttributes);
        } finally {
          UserAttributes.userAttributes.set(null);
        }
      }

      boolean isHA = functionAttributes[1] == 1;
      boolean hasResult = functionAttributes[0] == 1;
      boolean optimizeForWrite = functionAttributes[2] == 1;
      return executeFunction(functionName, hasResult, isHA, optimizeForWrite);
    } else {
      return executeFunction(functionObject);
    }
  }
}
