/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.internal.cache.execute;

import java.util.Set;

import org.apache.geode.cache.LowMemoryException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.TransactionDataNotColocatedException;
import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.MemoryThresholdInfo;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.control.MemoryThresholds;

/**
 * Executes Function on Distributed Regions.
 *
 * For DistributedRegions with DataPolicy.NORMAL, it throws UnsupportedOperationException. <br>
 * For DistributedRegions with DataPolicy.EMPTY, execute the function on any random member which has
 * DataPolicy.REPLICATE <br>
 * For DistributedRegions with DataPolicy.REPLICATE, execute the function locally.
 *
 *
 * @since GemFire 5.8 LA
 *
 */
public class DistributedRegionFunctionExecutor extends AbstractExecution {

  private final LocalRegion region;

  private ServerToClientFunctionResultSender sender;

  public DistributedRegionFunctionExecutor(Region r) {
    if (r == null) {
      throw new IllegalArgumentException(
          String.format("The input %s for the execute function request is null",
              "region"));
    }
    this.region = (LocalRegion) r;
  }

  private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe) {
    super(drfe);
    this.region = drfe.region;
    if (drfe.filter != null) {
      this.filter.clear();
      this.filter.addAll(drfe.filter);
    }
    this.sender = drfe.sender;
  }

  public DistributedRegionFunctionExecutor(DistributedRegion region, Set filter2, Object args,
      MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) {
    if (args != null) {
      this.args = args;
    } else if (memberMappedArg != null) {
      this.memberMappedArg = memberMappedArg;
      this.isMemberMappedArgument = true;
    }
    this.sender = resultSender;
    if (filter2 != null) {
      this.filter.clear();
      this.filter.addAll(filter2);
    }
    this.region = region;
    this.isClientServerMode = true;
  }

  private DistributedRegionFunctionExecutor(
      DistributedRegionFunctionExecutor distributedRegionFunctionExecutor,
      MemberMappedArgument argument) {
    super(distributedRegionFunctionExecutor);

    this.region = distributedRegionFunctionExecutor.getRegion();
    this.filter.clear();
    this.filter.addAll(distributedRegionFunctionExecutor.filter);
    this.sender = distributedRegionFunctionExecutor.getServerResultSender();

    this.memberMappedArg = argument;
    this.isMemberMappedArgument = true;

  }

  private DistributedRegionFunctionExecutor(
      DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, ResultCollector rs) {
    super(distributedRegionFunctionExecutor);

    this.region = distributedRegionFunctionExecutor.getRegion();
    this.filter.clear();
    this.filter.addAll(distributedRegionFunctionExecutor.filter);
    this.sender = distributedRegionFunctionExecutor.getServerResultSender();

    this.rc = rs;
  }

  private DistributedRegionFunctionExecutor(
      DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, Object args) {
    super(distributedRegionFunctionExecutor);

    this.region = distributedRegionFunctionExecutor.getRegion();
    this.filter.clear();
    this.filter.addAll(distributedRegionFunctionExecutor.filter);
    this.sender = distributedRegionFunctionExecutor.getServerResultSender();

    this.args = args;
  }

  private DistributedRegionFunctionExecutor(
      DistributedRegionFunctionExecutor distributedRegionFunctionExecutor, Set filter2) {
    super(distributedRegionFunctionExecutor);

    this.region = distributedRegionFunctionExecutor.getRegion();
    this.sender = distributedRegionFunctionExecutor.getServerResultSender();

    this.filter.clear();
    this.filter.addAll(filter2);
  }

  private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe,
      boolean isReExecute) {
    super(drfe);
    this.region = drfe.region;
    if (drfe.filter != null) {
      this.filter.clear();
      this.filter.addAll(drfe.filter);
    }
    this.sender = drfe.sender;
    this.isReExecute = isReExecute;
  }

  @Override
  public ResultCollector execute(final String functionName) {
    if (functionName == null) {
      throw new FunctionException(
          "The input function for the execute function request is null");
    }
    this.isFnSerializationReqd = false;
    Function functionObject = FunctionService.getFunction(functionName);
    if (functionObject == null) {
      throw new FunctionException(
          String.format("Function named %s is not registered to FunctionService",
              functionName));
    }
    if (region.getAttributes().getDataPolicy().isNormal()) {
      throw new FunctionException(
          "Function execution on region with DataPolicy.NORMAL is not supported");
    }
    return executeFunction(functionObject);
  }

  @Override
  public ResultCollector execute(final Function function) {
    if (function == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "function instance"));
    }
    if (function.isHA() && !function.hasResult()) {
      throw new FunctionException(
          "For Functions with isHA true, hasResult must also be true.");
    }
    if (region.getAttributes().getDataPolicy().isNormal()) {
      throw new FunctionException(
          "Function execution on region with DataPolicy.NORMAL is not supported");
    }
    String id = function.getId();
    if (id == null) {
      throw new FunctionException(
          "The Function#getID() returned null");
    }
    this.isFnSerializationReqd = true;
    return executeFunction(function);
  }

  @Override
  protected ResultCollector executeFunction(Function function) {
    if (function.hasResult()) { // have Results
      if (this.rc == null) { // Default Result Collector
        ResultCollector defaultCollector = new DefaultResultCollector();
        return this.region.executeFunction(this, function, args, defaultCollector, this.filter,
            this.sender);
      } else { // Custome Result COllector
        return this.region.executeFunction(this, function, args, rc, this.filter, this.sender);
      }
    } else { // No results
      this.region.executeFunction(this, function, args, null, this.filter, this.sender);
      return new NoResult();
    }
  }

  @Override
  public Execution withFilter(Set filter) {
    if (filter == null) {
      throw new FunctionException(
          String.format("The input %s for the execute function request is null",
              "filter"));
    }

    return new DistributedRegionFunctionExecutor(this, filter);
  }

  @Override
  public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
    if (bucketIDs != null && !bucketIDs.isEmpty()) {
      throw new IllegalArgumentException(
          String.format("Buckets as filter cannot be applied to a non partitioned region: %s",
              region.getName()));
    }
    return this;
  }

  public LocalRegion getRegion() {
    return this.region;
  }

  public ServerToClientFunctionResultSender getServerResultSender() {
    return this.sender;
  }



  @Override
  public Execution setArguments(Object args) {
    if (args == null) {
      throw new IllegalArgumentException(
          String.format("The input %s for the execute function request is null",
              "Args"));
    }
    return new DistributedRegionFunctionExecutor(this, args);
  }

  @Override
  public Execution withArgs(Object args) {
    return setArguments(args);
  }

  @Override
  public Execution withCollector(ResultCollector rs) {
    if (rs == null) {
      throw new IllegalArgumentException(
          String.format("The input %s for the execute function request is null",
              "Result Collector"));
    }
    return new DistributedRegionFunctionExecutor(this, rs);
  }

  @Override
  public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
    if (argument == null) {
      throw new IllegalArgumentException(
          String.format("The input %s for the execute function request is null",
              "MemberMappedArgument"));
    }
    return new DistributedRegionFunctionExecutor(this, argument);
  }

  @Override
  public AbstractExecution setIsReExecute() {
    return new DistributedRegionFunctionExecutor(this, true);
  }

  @Override
  public String toString() {
    final StringBuffer buf = new StringBuffer();
    buf.append("[ DistributedRegionFunctionExecutor:");
    buf.append("args=");
    buf.append(this.args);
    buf.append(";region=");
    buf.append(this.region.getName());
    buf.append("]");
    return buf.toString();
  }

  /*
   * (non-Javadoc)
   *
   * @see
   * org.apache.geode.internal.cache.execute.AbstractExecution#validateExecution(org.apache.geode.
   * cache.execute.Function, java.util.Set)
   */
  @Override
  public void validateExecution(Function function, Set targetMembers) {
    InternalCache cache = region.getGemFireCache();
    if (cache != null && cache.getTxManager().getTXState() != null) {
      if (targetMembers.size() > 1) {
        throw new TransactionException(
            "Function inside a transaction cannot execute on more than one node");
      } else {
        assert targetMembers.size() == 1;
        DistributedMember funcTarget = (DistributedMember) targetMembers.iterator().next();
        DistributedMember target = cache.getTxManager().getTXState().getTarget();
        if (target == null) {
          cache.getTxManager().getTXState().setTarget(funcTarget);
        } else if (!target.equals(funcTarget)) {
          throw new TransactionDataNotColocatedException(
              String.format(
                  "Function execution is not colocated with transaction. The transactional data is hosted on node %s, but you are trying to target node %s",
                  target, funcTarget));
        }
      }
    }
    if (!MemoryThresholds.isLowMemoryExceptionDisabled() && function.optimizeForWrite()) {
      MemoryThresholdInfo info = region.getAtomicThresholdInfo();
      if (info.isMemoryThresholdReached()) {
        InternalResourceManager.getInternalResourceManager(region.getCache()).getHeapMonitor()
            .updateStateAndSendEvent();
        Set<DistributedMember> criticalMembers = info.getMembersThatReachedThreshold();
        throw new LowMemoryException(
            String.format(
                "Function: %s cannot be executed because the members %s are running low on memory",
                function.getId(), criticalMembers),
            criticalMembers);
      }
    }
  }

}
