/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache.client.internal;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.logging.log4j.Logger;

import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.execute.BucketMovedException;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;

public class SingleHopClientExecutor {

  private static final Logger logger = LogService.getLogger();

  @MakeNotStatic
  static final ExecutorService execService =
      LoggingExecutors.newCachedThreadPool("Function Execution Thread-", true);

  static void submitAll(List callableTasks) {
    if (callableTasks != null && !callableTasks.isEmpty()) {
      List futures = null;
      try {
        futures = execService.invokeAll(callableTasks);
      } catch (InterruptedException e) {
        throw new InternalGemFireException(e.getMessage());
      }
      if (futures != null) {
        Iterator itr = futures.iterator();
        while (itr.hasNext() && !execService.isShutdown() && !execService.isTerminated()) {
          Future fut = (Future) itr.next();
          try {
            fut.get();
          } catch (InterruptedException e) {
            throw new InternalGemFireException(e.getMessage());
          } catch (ExecutionException ee) {
            if (ee.getCause() instanceof FunctionException) {
              throw (FunctionException) ee.getCause();
            } else if (ee.getCause() instanceof ServerOperationException) {
              throw (ServerOperationException) ee.getCause();
            } else if (ee.getCause() instanceof ServerConnectivityException) {
              throw (ServerConnectivityException) ee.getCause();
            } else {
              throw executionThrowable(ee.getCause());
            }
          }
        }
      }
    }
  }


  static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA,
      ResultCollector rc, Set<String> failedNodes,
      final int retryAttemptsArg,
      final PoolImpl pool) {

    ClientMetadataService cms = region.getCache().getClientMetadataService();
    int maxRetryAttempts = 0;

    if (callableTasks != null && !callableTasks.isEmpty()) {
      List futures = null;
      try {
        futures = execService.invokeAll(callableTasks);
      } catch (InterruptedException e) {
        throw new InternalGemFireException(e.getMessage());
      }
      if (futures != null) {
        GemFireException functionExecutionException = null;
        Iterator futureItr = futures.iterator();
        Iterator taskItr = callableTasks.iterator();
        final boolean isDebugEnabled = logger.isDebugEnabled();
        while (futureItr.hasNext() && !execService.isShutdown() && !execService.isTerminated()) {
          Future fut = (Future) futureItr.next();
          SingleHopOperationCallable task = (SingleHopOperationCallable) taskItr.next();
          ServerLocation server = task.getServer();
          try {
            fut.get();
            if (isDebugEnabled) {
              logger.debug("ExecuteRegionFunctionSingleHopOp#got result from {}", server);
            }
          } catch (InterruptedException e) {
            throw new InternalGemFireException(e.getMessage());
          } catch (ExecutionException ee) {

            if (maxRetryAttempts == 0) {
              maxRetryAttempts = retryAttemptsArg;
            }

            if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
              // If the retryAttempt is set to default(-1). Try it on all servers once.
              // Calculating number of servers when function is re-executed as it involves
              // messaging locator.
              maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
            }

            if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
              if (isDebugEnabled) {
                logger.debug(
                    "ExecuteRegionFunctionSingleHopOp#ExecutionException.InternalFunctionInvocationTargetException : Caused by :{}",
                    ee.getCause());
              }
              try {
                cms = region.getCache().getClientMetadataService();
              } catch (CacheClosedException e) {
                return 0;
              }
              cms.removeBucketServerLocation(server);
              cms.scheduleGetPRMetaData(region, false);

              failedNodes.addAll(
                  ((InternalFunctionInvocationTargetException) ee.getCause()).getFailedNodeSet());
              // Clear the results only if isHA so that partial results can be returned.
              if (isHA && maxRetryAttempts != 0) {
                rc.clearResults();
              } else {
                if (ee.getCause().getCause() != null) {
                  functionExecutionException =
                      new FunctionInvocationTargetException(ee.getCause().getCause());
                } else {
                  functionExecutionException =
                      new FunctionInvocationTargetException(new BucketMovedException(
                          "Bucket migrated to another node. Please retry."));
                }
              }
            } else if (ee.getCause() instanceof FunctionException) {
              if (isDebugEnabled) {
                logger.debug(
                    "ExecuteRegionFunctionSingleHopOp#ExecutionException.FunctionException : Caused by :{}",
                    ee.getCause());
              }
              FunctionException fe = (FunctionException) ee.getCause();
              if (isHA) {
                throw fe;
              } else {
                functionExecutionException = fe;
              }
            } else if (ee.getCause() instanceof ServerOperationException) {
              if (isDebugEnabled) {
                logger.debug(
                    "ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerOperationException : Caused by :{}",
                    ee.getCause());
              }
              ServerOperationException soe = (ServerOperationException) ee.getCause();
              if (isHA) {
                throw soe;
              } else {
                functionExecutionException = soe;
              }
            } else if (ee.getCause() instanceof ServerConnectivityException) {
              if (isDebugEnabled) {
                logger.debug(
                    "ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerConnectivityException : Caused by :{} The failed server is: {}",
                    ee.getCause(), server);
              }
              try {
                cms = region.getCache().getClientMetadataService();
              } catch (CacheClosedException e) {
                return 0;
              }
              cms.removeBucketServerLocation(server);
              cms.scheduleGetPRMetaData(region, false);
              // Clear the results only if isHA so that partial results can be returned.
              if (isHA && maxRetryAttempts != 0) {
                rc.clearResults();
              } else {
                functionExecutionException = (ServerConnectivityException) ee.getCause();
              }
            } else {
              throw executionThrowable(ee.getCause());
            }
          }
        }
        if (functionExecutionException != null) {
          throw functionExecutionException;
        }
      }
    }
    return maxRetryAttempts;
  }

  /**
   * execute bulk op (putAll or removeAll) on multiple PR servers, returning a map of the results.
   * Results are either a VersionedObjectList or a BulkOpPartialResultsException
   *
   * @return the per-server results
   */
  static Map<ServerLocation, Object> submitBulkOp(List callableTasks,
      ClientMetadataService cms,
      LocalRegion region,
      Map<ServerLocation, RuntimeException> failedServers) {
    if (callableTasks != null && !callableTasks.isEmpty()) {
      Map<ServerLocation, Object> resultMap = new HashMap<ServerLocation, Object>();
      boolean anyPartialResults = false;
      List futures = null;
      try {
        futures = execService.invokeAll(callableTasks);
      } catch (InterruptedException e) {
        throw new InternalGemFireException(e.getMessage());
      }
      if (futures != null) {
        Iterator futureItr = futures.iterator();
        Iterator taskItr = callableTasks.iterator();
        RuntimeException rte = null;
        while (futureItr.hasNext() && !execService.isShutdown() && !execService.isTerminated()) {
          Future fut = (Future) futureItr.next();
          SingleHopOperationCallable task = (SingleHopOperationCallable) taskItr.next();
          ServerLocation server = task.getServer();
          try {
            VersionedObjectList versions = (VersionedObjectList) fut.get();
            if (logger.isDebugEnabled()) {
              logger.debug("submitBulkOp#got result from {}:{}", server, versions);
            }
            resultMap.put(server, versions);
          } catch (InterruptedException e) {
            InternalGemFireException ige = new InternalGemFireException(e);
            // only to make this server as failed server, not to throw right now
            failedServers.put(server, ige);
            if (rte == null) {
              rte = ige;
            }
          } catch (ExecutionException ee) {
            if (ee.getCause() instanceof ServerOperationException) {
              if (logger.isDebugEnabled()) {
                logger.debug("submitBulkOp#ExecutionException from server {}", server, ee);
              }
              ServerOperationException soe = (ServerOperationException) ee.getCause();
              // only to make this server as failed server, not to throw right now
              failedServers.put(server, soe);
              if (rte == null) {
                rte = soe;
              }
            } else if (ee.getCause() instanceof ServerConnectivityException) {
              if (logger.isDebugEnabled()) {
                logger.debug("submitBulkOp#ExecutionException for server {}", server, ee);
              }
              cms = region.getCache().getClientMetadataService();
              cms.removeBucketServerLocation(server);
              cms.scheduleGetPRMetaData(region, false);
              failedServers.put(server, (ServerConnectivityException) ee.getCause());
            } else {
              Throwable t = ee.getCause();
              if (t instanceof PutAllPartialResultException) {
                resultMap.put(server, t);
                anyPartialResults = true;
                failedServers.put(server, (PutAllPartialResultException) t);
              } else {
                RuntimeException other_rte = executionThrowable(ee.getCause());
                failedServers.put(server, other_rte);
                if (rte == null) {
                  rte = other_rte;
                }
              }
            }
          } // catch
        } // while
        // if there are any partial results we suppress throwing an exception
        // so the partial results can be processed
        if (rte != null && !anyPartialResults) {
          throw rte;
        }
      }
      return resultMap;
    }
    return null;
  }

  static Map<ServerLocation, Object> submitGetAll(
      Map<ServerLocation, HashSet> serverToFilterMap,
      List callableTasks, ClientMetadataService cms,
      LocalRegion region) {

    if (callableTasks != null && !callableTasks.isEmpty()) {
      Map<ServerLocation, Object> resultMap = new HashMap<ServerLocation, Object>();
      List futures = null;
      try {
        futures = execService.invokeAll(callableTasks);
      } catch (InterruptedException e) {
        throw new InternalGemFireException(e.getMessage());
      }
      if (futures != null) {
        Iterator futureItr = futures.iterator();
        Iterator taskItr = callableTasks.iterator();
        while (futureItr.hasNext() && !execService.isShutdown() && !execService.isTerminated()) {
          Future fut = (Future) futureItr.next();
          SingleHopOperationCallable task = (SingleHopOperationCallable) taskItr.next();
          List keys = ((GetAllOpImpl) task.getOperation()).getKeyList();
          ServerLocation server = task.getServer();
          try {

            VersionedObjectList valuesFromServer = (VersionedObjectList) fut.get();
            valuesFromServer.setKeys(keys);

            for (VersionedObjectList.Iterator it = valuesFromServer.iterator(); it.hasNext();) {
              VersionedObjectList.Entry entry = it.next();
              Object key = entry.getKey();
              Object value = entry.getValue();
              if (!entry.isKeyNotOnServer()) {
                if (value instanceof Throwable) {
                  logger.warn(String.format(
                      "%s: Caught the following exception attempting to get value for key=%s",
                      new Object[] {value, key}),
                      (Throwable) value);
                }
              }
            }
            if (logger.isDebugEnabled()) {
              logger.debug("GetAllOp#got result from {}: {}", server, valuesFromServer);
            }
            resultMap.put(server, valuesFromServer);
          } catch (InterruptedException e) {
            throw new InternalGemFireException(e.getMessage());
          } catch (ExecutionException ee) {
            if (ee.getCause() instanceof ServerOperationException) {
              if (logger.isDebugEnabled()) {
                logger.debug("GetAllOp#ExecutionException.ServerOperationException : Caused by :{}",
                    ee.getCause());
              }
              throw (ServerOperationException) ee.getCause();
            } else if (ee.getCause() instanceof ServerConnectivityException) {
              if (logger.isDebugEnabled()) {
                logger.debug(
                    "GetAllOp#ExecutionException.ServerConnectivityException : Caused by :{} The failed server is: {}",
                    ee.getCause(), server);
              }
              try {
                cms = region.getCache().getClientMetadataService();
              } catch (CacheClosedException e) {
                return null;
              }
              cms.removeBucketServerLocation(server);
              cms.scheduleGetPRMetaData((LocalRegion) region, false);
              resultMap.put(server, ee.getCause());
            } else {
              throw executionThrowable(ee.getCause());
            }
          }
        }
        return resultMap;
      }
    }
    return null;
  }

  static void submitTask(Runnable task) {
    execService.execute(task);
  }

  // Find out what exception to throw?
  private static RuntimeException executionThrowable(Throwable t) {
    if (t instanceof RuntimeException)
      return (RuntimeException) t;
    else if (t instanceof Error)
      throw (Error) t;
    else
      throw new IllegalStateException("Don't know", t);
  }
}
