KUDU-2612 Java client transaction API

This patch is focused on the API rather than the actual functionality
under the hood.  The functionality to do the heavy-lifting (i.e. issuing
RPC calls to TxnManager, retrying in case of transient errors, tests,
etc.) will be posted as a separate patch as per our discussion with
Andrew and Hao.

The proposed API is mirroring the API for the C++ client with a few
twists in the functions' signatures: the Java client uses exceptions
instead of return statuses, etc.

The asynchronous API bindings (i.e. bindings with Deferred<Xxx>) aren't
provided in this patch.  I'm not sure it makes any sense in investing
in that at this point given that I'm not aware of any users of the
asynchronous Kudu client API except for Java Kudu client itself.
If necessary, we can add AsyncKuduTransaction with appropriate semantics
later on.

Change-Id: Idbb18e1ac0454a8ef9e3486430dfaa336e381e07
Reviewed-on: http://gerrit.cloudera.org:8080/16894
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index d680b12..5d84168 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -204,7 +204,7 @@
    * @param txnId transaction identifier for all operations within the session
    */
   AsyncKuduSession(AsyncKuduClient client, long txnId) {
-    assert txnId >= 0;
+    assert txnId > AsyncKuduClient.INVALID_TXN_ID;
     this.client = client;
     this.txnId = txnId;
     flushMode = FlushMode.AUTO_FLUSH_SYNC;
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 7b58be9..cf7806d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -300,6 +300,22 @@
   }
 
   /**
+   * Start a new multi-row distributed transaction.
+   * <p>
+   * Start a new multi-row transaction and return a handle for the transactional
+   * object to manage the newly started transaction. Under the hood, this makes
+   * an RPC call to the Kudu cluster and registers a newly created transaction
+   * in the system. This call is blocking.
+   *
+   * @return a handle to the newly started transaction in case of success
+   */
+  public KuduTransaction newTransaction() throws KuduException {
+    KuduTransaction txn = new KuduTransaction(asyncClient);
+    txn.begin();
+    return txn;
+  }
+
+  /**
    * Check if statistics collection is enabled for this client.
    * @return true if it is enabled, else false
    */
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
new file mode 100644
index 0000000..a281b4d
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTransaction.java
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.transactions.Transactions.TxnTokenPB;
+
+/**
+ * A handle for a multi-row transaction in Kudu.
+ * <p>
+ * Once created using {@link KuduClient#newTransaction} or
+ * {@link KuduTransaction#deserialize} methods, an instance of this class
+ * can be used to commit or rollback the underlying multi-row transaction. To
+ * issue write operations as a part of the transaction, use the
+ * {@link KuduTransaction#newKuduSession} or
+ * {@link KuduTransaction#newAsyncKuduSession} methods to create a new
+ * transactional session and apply write operations using it.
+ * <p>
+ * The {@link KuduTransaction} implements {@link AutoCloseable} and should be
+ * used with try-with-resource code construct. Once an object of this class
+ * is constructed, it starts sending automatic keep-alive heartbeat messages
+ * to keep the underlying transaction open. Once the object goes out of scope
+ * and {@link KuduTransaction#close} is automatically called by the Java
+ * runtime (or the method is called explicitly), the heartbeating stops and the
+ * transaction is automatically aborted by the system after not receiving
+ * heartbeat messages for a few keep-alive intervals.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class KuduTransaction implements AutoCloseable {
+
+  /**
+   * A utility class to help with the serialization of {@link KuduTransaction}.
+   * <p>
+   * As of now, the single purpose of this class is to control the keepalive
+   * behavior for the {@link KuduTransaction} handle once it's deserialized from
+   * a token. In future, the list of configurable parameters might be extended
+   * (e.g., add commit and abort permissions, i.e. whether a handle can be used
+   * to commit and/or abort the underlying transaction).
+   */
+  public static class SerializationOptions {
+
+    private boolean enableKeepalive;
+
+    /**
+     * Construct an object with default settings.
+     */
+    SerializationOptions() {
+      this.enableKeepalive = false;
+    }
+
+    /**
+     * @return whether the transaction handle produced from an instance of
+     *         {@link KuduTransaction} by the {@link KuduTransaction#serialize},
+     *         {@link KuduTransaction#deserialize} call sequence will send
+     *         keepalive messages to avoid automatic rollback of the underlying
+     *         transaction.
+     */
+    public boolean isKeepaliveEnabled() {
+      return enableKeepalive;
+    }
+
+    /**
+     * Toggle the automatic sending of keepalive messages for transaction handle.
+     * <p>
+     * This method toggles the automatic sending of keepalive messages for a
+     * deserialized transaction handle that is created from the result serialized
+     * token upon calling {@link KuduTransaction#serialize} method.
+     * <p>
+     * No keepalive heartbeat messages are sent from a transaction handle whose
+     * source token was created with the default "keepalive disabled" setting.
+     * The idea here is that the most common use case for using transaction
+     * tokens is of the "star topology" (see below), so it is enough to have
+     * just one top-level handle sending keepalive messages. Overall, having more
+     * than one actor sending keepalive messages for a transaction is acceptable
+     * but it puts needless load on a cluster.
+     * <p>
+     * The most common use case for a transaction's handle
+     * serialization/deserialization is of the "star topology": a transaction is
+     * started by a top-level application which sends the transaction token
+     * produced by serializing the original transaction handle to other worker
+     * applications running concurrently, where the latter write their data
+     * in the context of the same transaction and report back to the top-level
+     * application, which in its turn initiates committing the transaction
+     * as needed. The important point is that the top-level application keeps the
+     * transaction handle around all the time from the start of the transaction
+     * to the very point when transaction is committed. Under the hood, the
+     * original transaction handle sends keepalive messages as required until
+     * commit phase is initiated, so the deserialized transaction handles which
+     * are used by the worker applications don't need to send keepalive messages.
+     * <p>
+     * The other (less common) use case is of the "ring topology": a chain of
+     * applications work sequentially as a part of the same transaction, where
+     * the very first application starts the transaction, writes its data, and
+     * hands over the responsibility of managing the lifecycle of the transaction
+     * to other application down the chain. After doing so it may exit, so now
+     * only the next application has the active transaction handle, and so on it
+     * goes until the transaction is committed by the application in the end
+     * of the chain. In this scenario, every deserialized handle has to send
+     * keepalive messages to avoid automatic rollback of the transaction,
+     * and every application in the chain should call
+     * {@link SerializationOptions#setEnableKeepalive} when serializing
+     * its transaction handle into a transaction token to pass to the application
+     * next in the chain.
+     *
+     * @param enableKeepalive whether to enable sending keepalive messages for
+     *                        the {@link KuduTransaction} object once it is
+     *                        deserialized from the bytes to be produced by the
+     *                        {@link KuduTransaction#serialize} method.
+     */
+    public SerializationOptions setEnableKeepalive(boolean enableKeepalive) {
+      this.enableKeepalive = enableKeepalive;
+      return this;
+    }
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KuduTransaction.class);
+  private static final SerializationOptions defaultSerializationOptions =
+      new SerializationOptions();
+
+  private final AsyncKuduClient client;
+  private long txnId = AsyncKuduClient.INVALID_TXN_ID;
+  private int keepaliveMillis = 0;
+  private boolean keepaliveEnabled = true;
+  private boolean isInFlight = false;
+  private final Object isInFlightSync = new Object();
+
+  /**
+   * Create an instance of a transaction handle bound to the specified client.
+   * <p>
+   * This constructor is used exclusively for the control paths involving
+   * {@link KuduClient#newTransaction} method.
+   *
+   * @param client client instance to operate with the underlying transaction
+   */
+  KuduTransaction(AsyncKuduClient client) {
+    Preconditions.checkArgument(client != null);
+    this.client = client;
+  }
+
+  /**
+   * Create an instance of a transaction handle for the specified parameters.
+   * <p>
+   * This constructor is used exclusively for the control paths involving
+   * {@link KuduTransaction#deserialize}.
+   *
+   * @param client client instance to operate with the underlying transaction
+   * @param txnId transaction identifier
+   * @param keepaliveMillis keepalive timeout interval: if the backend isn't
+   *                        receiving keepalive messages at least every
+   *                        keepaliveMillis time interval, it automatically
+   *                        aborts the underlying transaction
+   * @param keepaliveEnabled whether the handle should automatically send
+   *                         keepalive messages to the backend
+   */
+  KuduTransaction(AsyncKuduClient client,
+                  long txnId,
+                  int keepaliveMillis,
+                  boolean keepaliveEnabled) {
+    Preconditions.checkArgument(client != null);
+    Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
+    Preconditions.checkArgument(keepaliveMillis >= 0);
+    this.client = client;
+    this.txnId = txnId;
+    this.keepaliveMillis = keepaliveMillis;
+    this.keepaliveEnabled = keepaliveEnabled;
+    this.isInFlight = true;
+  }
+
+  /**
+   * Start a transaction.
+   * <p>
+   * This method isn't a part of the public API, it's used only internally.
+   *
+   * @throws KuduException if something went wrong
+   */
+  void begin() throws KuduException {
+    synchronized (isInFlightSync) {
+      Preconditions.checkState(!isInFlight);
+    }
+
+    // TODO(aserbin): implement
+
+    // Once the heavy-lifting has successfully completed, mark this instance
+    // as a handle for an in-flight transaction.
+    synchronized (isInFlightSync) {
+      isInFlight = true;
+    }
+  }
+
+  /**
+   * Create a new {@link AsyncKuduSession} based on this transaction.
+   * <p>
+   * All write operations using the result session will be performed in the
+   * context of this transaction.
+   *
+   * @return a new {@link AsyncKuduSession} instance
+   */
+  public AsyncKuduSession newAsyncKuduSession() {
+    synchronized (isInFlightSync) {
+      Preconditions.checkState(isInFlight);
+    }
+    return new AsyncKuduSession(client, txnId);
+  }
+
+  /**
+   * Create a new {@link KuduSession} based on this transaction.
+   * <p>
+   * All write operations using the result session will be performed in the
+   * context of this transaction.
+   *
+   * @return a new {@link KuduSession} instance
+   */
+  public KuduSession newKuduSession() {
+    synchronized (isInFlightSync) {
+      Preconditions.checkState(isInFlight);
+    }
+    return new KuduSession(new AsyncKuduSession(client, txnId));
+  }
+
+  /**
+   * Commit the multi-row distributed transaction represented by this handle.
+   *
+   * @param wait whether to wait for the transaction's commit phase to finalize.
+   *             If {@code true}, this method blocks until the commit is
+   *             finalized, otherwise it starts committing the transaction and
+   *             returns. In the latter case, it's possible to check for the
+   *             transaction status using the
+   *             {@link KuduTransaction#isCommitComplete} method.
+   * @throws KuduException if something went wrong
+   */
+  public void commit(boolean wait) throws KuduException {
+    Preconditions.checkState(isInFlight);
+
+    // TODO(aserbin): implement
+
+    // Once everything else is completed successfully, mark the transaction as
+    // no longer in flight.
+    synchronized (isInFlightSync) {
+      isInFlight = false;
+    }
+  }
+
+  /**
+   * Check whether the commit phase for a transaction is complete.
+   *
+   * @return {@code true} if transaction has finalized, otherwise {@code false}
+   * @throws KuduException if an error happens while querying the system about
+   *                       the state of the transaction
+   */
+  public boolean isCommitComplete() throws KuduException {
+    Preconditions.checkState(!isInFlight);
+    // TODO(aserbin): implement
+    return false;
+  }
+
+  /**
+   * Rollback the multi-row distributed transaction represented by this object.
+   * <p>
+   * This method initiates rolling back the transaction and returns right after
+   * that. The system takes care of the rest. Once the control returns and
+   * no exception is thrown, a client have a guarantee that all write
+   * operations issued in the context of this transaction cannot be seen seen
+   * outside.
+   *
+   * @throws KuduException if something went wrong
+   */
+  public void rollback() throws KuduException {
+    Preconditions.checkState(isInFlight, "transaction is not open for this handle");
+
+    // TODO(aserbin): implement
+
+    // Once everything else is completed successfully, mark the transaction as
+    // no longer in flight.
+    synchronized (isInFlightSync) {
+      isInFlight = false;
+    }
+  }
+
+  /**
+   * Export information on the underlying transaction in a serialized form.
+   * <p>
+   * This method transforms this handle into its serialized representation.
+   * <p>
+   * The serialized information on a Kudu transaction can be passed among
+   * different Kudu clients running at multiple nodes, so those separate
+   * Kudu clients can perform operations to be a part of the same distributed
+   * transaction. The resulting string is referred as "transaction token" and
+   * it can be deserialized into a transaction handle (i.e. an object of this
+   * class) via the {@link KuduTransaction#deserialize} method.
+   * <p>
+   * This method doesn't perform any RPC under the hood.
+   * <p>
+   * The representation of the data in the serialized form (i.e. the format of
+   * a Kudu transaction token) is an implementation detail, not a part of the
+   * public API and can be changed without notice.
+   *
+   * @return the serialized form of this transaction handle
+   * @throws IOException if serialization fails
+   */
+  public byte[] serialize(SerializationOptions options) throws IOException {
+    LOG.debug("serializing handle for transaction ID {}", txnId);
+    Preconditions.checkState(
+        txnId != AsyncKuduClient.INVALID_TXN_ID,
+        "invalid transaction handle");
+    TxnTokenPB.Builder b = TxnTokenPB.newBuilder();
+    b.setTxnId(txnId);
+    b.setEnableKeepalive(options.isKeepaliveEnabled());
+    b.setKeepaliveMillis(keepaliveMillis);
+    TxnTokenPB message = b.build();
+    byte[] buf = new byte[message.getSerializedSize()];
+    CodedOutputStream cos = CodedOutputStream.newInstance(buf);
+    message.writeTo(cos);
+    cos.flush();
+    return buf;
+  }
+
+  /**
+   * A shortcut for the {@link KuduTransaction#serialize(SerializationOptions)}
+   * method invoked with default-constructed {@link SerializationOptions}.
+   */
+  public byte[] serialize() throws IOException {
+    return serialize(defaultSerializationOptions);
+  }
+
+  /**
+   * Re-create KuduTransaction object given its serialized representation.
+   * <p>
+   * This method doesn't perform any RPC under the hood. The newly created
+   * object automatically does or does not send keep-alive messages depending
+   * on the {@link SerializationOptions#isKeepaliveEnabled} setting when
+   * the original {@link KuduTransaction} object was serialized using
+   * {@link KuduTransaction#serialize} method.
+   * <p>
+   * @param client Client instance to bound the result object to
+   * @param buf serialized representation of a {@link KuduTransaction} object
+   * @return Operation result status.
+   * @throws IOException if deserialization fails
+   */
+  public static KuduTransaction deserialize(
+      byte[] buf, AsyncKuduClient client) throws IOException {
+    TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
+    final long txnId = pb.getTxnId();
+    final int keepaliveMillis = pb.getKeepaliveMillis();
+    final boolean keepaliveEnabled =
+        pb.hasEnableKeepalive() && pb.getEnableKeepalive();
+    return new KuduTransaction(client, txnId, keepaliveMillis, keepaliveEnabled);
+  }
+
+  /**
+   * Stop keepalive heartbeating, if any was in progress for this transaction
+   * handle.
+   * <p>
+   * This method is called automatically when the object goes out of scope
+   * as prescribed for {@link AutoCloseable}.
+   * <p>
+   * This method doesn't throw according to the recommendations for
+   * {@link AutoCloseable#close}. In case of an error, this method just logs
+   * the corresponding error message.
+   */
+  @Override
+  public void close() {
+    try {
+      if (keepaliveEnabled) {
+        LOG.debug("stopping keepalive heartbeating for transaction ID {}", txnId);
+        // TODO(aserbin): stop sending keepalive heartbeats to TxnManager
+      }
+    } catch (Exception e) {
+      LOG.error("exception while automatically rolling back a transaction", e);
+    }
+  }
+}