/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tephra.distributed;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.name.Named;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionNotInProgressException;
import org.apache.tephra.TransactionSizeException;
import org.apache.tephra.TransactionSystemClient;
import org.apache.tephra.TxConstants;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionClientModule;
import org.apache.tephra.runtime.TransactionModules;
import org.apache.tephra.runtime.ZKModule;
import org.apache.tephra.util.ConfigurationFactory;
import org.apache.thrift.TException;
import org.apache.twill.zookeeper.ZKClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;

/**
 * A tx service client
 */
public class TransactionServiceClient implements TransactionSystemClient {

  private static final Logger LOG =
      LoggerFactory.getLogger(TransactionServiceClient.class);

  // we will use this to provide every call with an tx client
  private ThriftClientProvider clientProvider;

  // the retry strategy we will use
  private final RetryStrategyProvider retryStrategyProvider;

  // client id that is used to identify the transactions
  private final String clientId;

  private final int changeSetCountLimit;
  private final int changeSetCountThreshold;
  private final long changeSetSizeLimit;
  private final long changeSetSizeThreshold;

  /**
   * Utility to be used for basic verification of transaction system availability and functioning
   * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
   * @throws Exception
   */
  public static void main(String[] args) throws Exception {
    if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
      System.out.println("USAGE: TransactionServiceClient [-v]");
    }

    boolean verbose = false;
    if (args.length == 1 && "-v".equals(args[0])) {
      verbose = true;
    }
    doMain(verbose, new ConfigurationFactory().get());
  }

  @VisibleForTesting
  public static void doMain(boolean verbose, Configuration conf) throws Exception {
    LOG.info("Starting tx server client test.");
    Injector injector = Guice.createInjector(
      new ConfigModule(conf),
      new ZKModule(),
      new DiscoveryModules().getDistributedModules(),
      new TransactionModules().getDistributedModules(),
      new TransactionClientModule()
    );

    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
    zkClient.startAndWait();

    try {
      TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
      LOG.info("Starting tx...");
      Transaction tx = client.startShort();
      if (verbose) {
        LOG.info("Started tx details: " + tx.toString());
      } else {
        LOG.info("Started tx: " + tx.getTransactionId() +
                   ", readPointer: " + tx.getReadPointer() +
                   ", invalids: " + tx.getInvalids().length +
                   ", inProgress: " + tx.getInProgress().length);
      }
      LOG.info("Checking if canCommit tx...");
      boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
      LOG.info("canCommit: " + canCommit);
      if (canCommit) {
        LOG.info("Committing tx...");
        boolean committed = client.commit(tx);
        LOG.info("Committed tx: " + committed);
        if (!committed) {
          LOG.info("Aborting tx...");
          client.abort(tx);
          LOG.info("Aborted tx...");
        }
      } else {
        LOG.info("Aborting tx...");
        client.abort(tx);
        LOG.info("Aborted tx...");
      }
    } finally {
      zkClient.stopAndWait();
    }
  }

  /**
   * Create from a configuration. This will first attempt to find a zookeeper for service discovery.
   * This constructor can be used if Guice dependency injection is not used. JVM name will be used for the client id.
   * @param config a configuration containing the zookeeper properties
   */
  public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider) {
    this(config, clientProvider, ManagementFactory.getRuntimeMXBean().getName());
  }

  /**
   * Create from a configuration. This will first attempt to find a zookeeper
   * for service discovery. Otherwise it will look for the port in the
   * config and use localhost.
   * @param config a configuration containing the zookeeper properties
   * @param clientId id of the client that identifies it when it starts a transaction
   */
  @Inject
  public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider,
                                  @Named(TxConstants.CLIENT_ID) String clientId) {

    // initialize the retry logic
    String retryStrat = config.get(
        TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY,
        TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY);
    if ("backoff".equals(retryStrat)) {
      this.retryStrategyProvider = new RetryWithBackoff.Provider();
    } else if ("n-times".equals(retryStrat)) {
      this.retryStrategyProvider = new RetryNTimes.Provider();
    } else {
      try {
        this.retryStrategyProvider = (RetryStrategyProvider) Class.forName(retryStrat).newInstance();
      } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
        throw new IllegalArgumentException(
          String.format("Unable to instantiate RetryStrategyProvider '%s'", retryStrat), e);
      }
    }
    this.retryStrategyProvider.configure(config);
    LOG.debug("Retry strategy is " + this.retryStrategyProvider);

    this.clientProvider = clientProvider;
    this.clientId = clientId;

    changeSetCountLimit = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_LIMIT,
                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_LIMIT);
    changeSetCountThreshold = config.getInt(TxConstants.Manager.CFG_TX_CHANGESET_COUNT_WARN_THRESHOLD,
                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_COUNT_WARN_THRESHOLD);
    changeSetSizeLimit = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_LIMIT,
                                        TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_LIMIT);
    changeSetSizeThreshold = config.getLong(TxConstants.Manager.CFG_TX_CHANGESET_SIZE_WARN_THRESHOLD,
                                            TxConstants.Manager.DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD);
  }

  /**
   * This is an abstract class that encapsulates an operation. It provides a
   * method to attempt the actual operation, and it can throw an operation
   * exception.
   * @param <T> The return type of the operation
   */
  abstract static class Operation<T> {

    /** the name of the operation. */
    String name;

    /** constructor with name of operation. */
    Operation(String name) {
      this.name = name;
    }

    /** return the name of the operation. */
    String getName() {
      return name;
    }

    /** execute the operation, given an tx client. */
    abstract T execute(TransactionServiceThriftClient client)
        throws Exception;
  }

  /** see execute(operation, client). */
  private <T> T execute(Operation<T> operation) throws Exception {
    return execute(operation, null);
  }

  /**
   * This is a generic method implementing the somewhat complex execution
   * and retry logic for operations, to avoid repetitive code.
   *
   * Attempts to execute one operation, by obtaining an tx client from
   * the client provider and passing the operation to the client. If the
   * call fails with a Thrift exception, apply the retry strategy. If no
   * more retries are to be made according to the strategy, call the
   * operation's error method to obtain a value to return. Note that error()
   * may also throw an exception. Note also that the retry logic is only
   * applied for thrift exceptions.
   *
   * @param operation The operation to be executed
   * @param provider An tx client provider. If null, then a client will be
   *                 obtained using the client provider
   * @param <T> The return type of the operation
   * @return the result of the operation, or a value returned by error()
   */
  private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception {
    RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy();
    while (true) {
      // did we get a custom client provider or do we use the default?
      if (provider == null) {
        provider = this.clientProvider;
      }
      // this will throw a TException if it cannot get a client
      try (CloseableThriftClient closeable = provider.getCloseableClient()) {
        // note that this can throw exceptions other than TException
        return operation.execute(closeable.getThriftClient());

      } catch (TException te) {
        // determine whether we should retry
        boolean retry = retryStrategy.failOnce();
        if (!retry) {
          // retry strategy is exceeded, throw an operation exception
          String message =
              "Thrift error for " + operation + ": " + te.getMessage();
          LOG.error(message);
          LOG.debug(message, te);
          throw te;
        } else {
          // call retry strategy before retrying
          retryStrategy.beforeRetry();
          String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage();
          LOG.info(msg);
          LOG.debug(msg, te);
        }
      }
    }
  }

  @Override
  public Transaction startLong() {
    try {
      return execute(
        new Operation<Transaction>("startLong") {
          @Override
          public Transaction execute(TransactionServiceThriftClient client)
            throws TException {
            return client.startLong(clientId);
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public Transaction startShort() {
    try {
      return execute(
        new Operation<Transaction>("startShort") {
          @Override
          public Transaction execute(TransactionServiceThriftClient client)
            throws TException {
            return client.startShort(clientId);
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public Transaction startShort(final int timeout) {
    try {
      return execute(
        new Operation<Transaction>("startShort") {
          @Override
          public Transaction execute(TransactionServiceThriftClient client)
            throws TException {
            return client.startShort(clientId, timeout);
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds)
    throws TransactionNotInProgressException {

    try {
      return execute(
        new Operation<Boolean>("canCommit") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client)
            throws Exception {
            return client.canCommit(tx, changeIds);
          }
        });
    } catch (TransactionNotInProgressException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
    throws TransactionFailureException {

    // we want to validate the size of the change set here before sending it over the wire.
    // if the change set is large, it can cause memory issues on the server side.
    if (changeIds.size() > changeSetCountLimit) {
      throw new TransactionSizeException(String.format(
        "Change set for transaction %d has %d entries and exceeds the limit of %d",
        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
    } else if (changeIds.size() > changeSetCountThreshold) {
      LOG.warn("Change set for transaction {} has {} entries. " +
                 "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
               tx.getTransactionId(), changeIds.size(), changeSetCountThreshold);
    }
    long byteCount = 0L;
    for (byte[] change : changeIds) {
      byteCount += change.length;
    }
    if (byteCount > changeSetSizeLimit) {
      throw new TransactionSizeException(String.format(
        "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
        tx.getTransactionId(), byteCount, changeSetSizeLimit));
    } else if (byteCount > changeSetSizeThreshold) {
      LOG.warn("Change set for transaction {} has total size of {} bytes. " +
                 "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
               tx.getTransactionId(), byteCount, changeSetSizeThreshold);
    }

    try {
      return execute(
        new Operation<Boolean>("canCommit") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client)
            throws Exception {
            return client.canCommitOrThrow(tx, changeIds);
          }
        });
    } catch (TransactionNotInProgressException | TransactionSizeException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
    try {
      return this.execute(
        new Operation<Boolean>("commit") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client)
            throws Exception {
            return client.commit(tx);
          }
        });
    } catch (TransactionNotInProgressException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public void abort(final Transaction tx) {
    try {
      this.execute(
        new Operation<Boolean>("abort") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client)
            throws TException {
            client.abort(tx);
            return true;
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean invalidate(final long tx) {
    try {
      return this.execute(
        new Operation<Boolean>("invalidate") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client)
            throws TException {
            return client.invalidate(tx);
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException {
    try {
      return this.execute(
          new Operation<Transaction>("checkpoint") {
            @Override
            Transaction execute(TransactionServiceThriftClient client) throws Exception {
              return client.checkpoint(tx);
            }
          }
      );
    } catch (TransactionNotInProgressException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
    try {
      return this.execute(
          new Operation<InputStream>("takeSnapshot") {
            @Override
            public InputStream execute(TransactionServiceThriftClient client)
                throws Exception {
              return client.getSnapshotStream();
            }
          });
    } catch (TransactionCouldNotTakeSnapshotException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public String status() {
    try {
      return this.execute(
        new Operation<String>("status") {
          @Override
          public String execute(TransactionServiceThriftClient client) throws Exception {
            return client.status();
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public void resetState() {
    try {
      this.execute(
          new Operation<Boolean>("resetState") {
            @Override
            public Boolean execute(TransactionServiceThriftClient client)
                throws TException {
              client.resetState();
              return true;
            }
          });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean truncateInvalidTx(final Set<Long> invalidTxIds) {
    try {
      return this.execute(
        new Operation<Boolean>("truncateInvalidTx") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client) throws TException {
            return client.truncateInvalidTx(invalidTxIds);
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException {
    try {
      return this.execute(
        new Operation<Boolean>("truncateInvalidTxBefore") {
          @Override
          public Boolean execute(TransactionServiceThriftClient client) throws Exception {
            return client.truncateInvalidTxBefore(time);
          }
        });
    } catch (InvalidTruncateTimeException e) {
      throw e;
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public int getInvalidSize() {
    try {
      return this.execute(
        new Operation<Integer>("getInvalidSize") {
          @Override
          public Integer execute(TransactionServiceThriftClient client) throws TException {
            return client.getInvalidSize();
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

  @Override
  public void pruneNow() {
    try {
      this.execute(
        new Operation<Void>("pruneNow") {
          @Override
          public Void execute(TransactionServiceThriftClient client)
            throws TException {
            client.pruneNow();
            return null;
          }
        });
    } catch (Exception e) {
      throw Throwables.propagate(e);
    }
  }

}
