blob: a281b4d9ae55adb96b8146f0f7bdcac36ac3a1a0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.kudu.client;
import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kudu.transactions.Transactions.TxnTokenPB;
/**
* A handle for a multi-row transaction in Kudu.
* <p>
* Once created using {@link KuduClient#newTransaction} or
* {@link KuduTransaction#deserialize} methods, an instance of this class
* can be used to commit or rollback the underlying multi-row transaction. To
* issue write operations as a part of the transaction, use the
* {@link KuduTransaction#newKuduSession} or
* {@link KuduTransaction#newAsyncKuduSession} methods to create a new
* transactional session and apply write operations using it.
* <p>
* The {@link KuduTransaction} implements {@link AutoCloseable} and should be
* used with try-with-resource code construct. Once an object of this class
* is constructed, it starts sending automatic keep-alive heartbeat messages
* to keep the underlying transaction open. Once the object goes out of scope
* and {@link KuduTransaction#close} is automatically called by the Java
* runtime (or the method is called explicitly), the heartbeating stops and the
* transaction is automatically aborted by the system after not receiving
* heartbeat messages for a few keep-alive intervals.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public class KuduTransaction implements AutoCloseable {
/**
* A utility class to help with the serialization of {@link KuduTransaction}.
* <p>
* As of now, the single purpose of this class is to control the keepalive
* behavior for the {@link KuduTransaction} handle once it's deserialized from
* a token. In future, the list of configurable parameters might be extended
* (e.g., add commit and abort permissions, i.e. whether a handle can be used
* to commit and/or abort the underlying transaction).
*/
public static class SerializationOptions {
private boolean enableKeepalive;
/**
* Construct an object with default settings.
*/
SerializationOptions() {
this.enableKeepalive = false;
}
/**
* @return whether the transaction handle produced from an instance of
* {@link KuduTransaction} by the {@link KuduTransaction#serialize},
* {@link KuduTransaction#deserialize} call sequence will send
* keepalive messages to avoid automatic rollback of the underlying
* transaction.
*/
public boolean isKeepaliveEnabled() {
return enableKeepalive;
}
/**
* Toggle the automatic sending of keepalive messages for transaction handle.
* <p>
* This method toggles the automatic sending of keepalive messages for a
* deserialized transaction handle that is created from the result serialized
* token upon calling {@link KuduTransaction#serialize} method.
* <p>
* No keepalive heartbeat messages are sent from a transaction handle whose
* source token was created with the default "keepalive disabled" setting.
* The idea here is that the most common use case for using transaction
* tokens is of the "star topology" (see below), so it is enough to have
* just one top-level handle sending keepalive messages. Overall, having more
* than one actor sending keepalive messages for a transaction is acceptable
* but it puts needless load on a cluster.
* <p>
* The most common use case for a transaction's handle
* serialization/deserialization is of the "star topology": a transaction is
* started by a top-level application which sends the transaction token
* produced by serializing the original transaction handle to other worker
* applications running concurrently, where the latter write their data
* in the context of the same transaction and report back to the top-level
* application, which in its turn initiates committing the transaction
* as needed. The important point is that the top-level application keeps the
* transaction handle around all the time from the start of the transaction
* to the very point when transaction is committed. Under the hood, the
* original transaction handle sends keepalive messages as required until
* commit phase is initiated, so the deserialized transaction handles which
* are used by the worker applications don't need to send keepalive messages.
* <p>
* The other (less common) use case is of the "ring topology": a chain of
* applications work sequentially as a part of the same transaction, where
* the very first application starts the transaction, writes its data, and
* hands over the responsibility of managing the lifecycle of the transaction
* to other application down the chain. After doing so it may exit, so now
* only the next application has the active transaction handle, and so on it
* goes until the transaction is committed by the application in the end
* of the chain. In this scenario, every deserialized handle has to send
* keepalive messages to avoid automatic rollback of the transaction,
* and every application in the chain should call
* {@link SerializationOptions#setEnableKeepalive} when serializing
* its transaction handle into a transaction token to pass to the application
* next in the chain.
*
* @param enableKeepalive whether to enable sending keepalive messages for
* the {@link KuduTransaction} object once it is
* deserialized from the bytes to be produced by the
* {@link KuduTransaction#serialize} method.
*/
public SerializationOptions setEnableKeepalive(boolean enableKeepalive) {
this.enableKeepalive = enableKeepalive;
return this;
}
}
private static final Logger LOG =
LoggerFactory.getLogger(KuduTransaction.class);
private static final SerializationOptions defaultSerializationOptions =
new SerializationOptions();
private final AsyncKuduClient client;
private long txnId = AsyncKuduClient.INVALID_TXN_ID;
private int keepaliveMillis = 0;
private boolean keepaliveEnabled = true;
private boolean isInFlight = false;
private final Object isInFlightSync = new Object();
/**
* Create an instance of a transaction handle bound to the specified client.
* <p>
* This constructor is used exclusively for the control paths involving
* {@link KuduClient#newTransaction} method.
*
* @param client client instance to operate with the underlying transaction
*/
KuduTransaction(AsyncKuduClient client) {
Preconditions.checkArgument(client != null);
this.client = client;
}
/**
* Create an instance of a transaction handle for the specified parameters.
* <p>
* This constructor is used exclusively for the control paths involving
* {@link KuduTransaction#deserialize}.
*
* @param client client instance to operate with the underlying transaction
* @param txnId transaction identifier
* @param keepaliveMillis keepalive timeout interval: if the backend isn't
* receiving keepalive messages at least every
* keepaliveMillis time interval, it automatically
* aborts the underlying transaction
* @param keepaliveEnabled whether the handle should automatically send
* keepalive messages to the backend
*/
KuduTransaction(AsyncKuduClient client,
long txnId,
int keepaliveMillis,
boolean keepaliveEnabled) {
Preconditions.checkArgument(client != null);
Preconditions.checkArgument(txnId > AsyncKuduClient.INVALID_TXN_ID);
Preconditions.checkArgument(keepaliveMillis >= 0);
this.client = client;
this.txnId = txnId;
this.keepaliveMillis = keepaliveMillis;
this.keepaliveEnabled = keepaliveEnabled;
this.isInFlight = true;
}
/**
* Start a transaction.
* <p>
* This method isn't a part of the public API, it's used only internally.
*
* @throws KuduException if something went wrong
*/
void begin() throws KuduException {
synchronized (isInFlightSync) {
Preconditions.checkState(!isInFlight);
}
// TODO(aserbin): implement
// Once the heavy-lifting has successfully completed, mark this instance
// as a handle for an in-flight transaction.
synchronized (isInFlightSync) {
isInFlight = true;
}
}
/**
* Create a new {@link AsyncKuduSession} based on this transaction.
* <p>
* All write operations using the result session will be performed in the
* context of this transaction.
*
* @return a new {@link AsyncKuduSession} instance
*/
public AsyncKuduSession newAsyncKuduSession() {
synchronized (isInFlightSync) {
Preconditions.checkState(isInFlight);
}
return new AsyncKuduSession(client, txnId);
}
/**
* Create a new {@link KuduSession} based on this transaction.
* <p>
* All write operations using the result session will be performed in the
* context of this transaction.
*
* @return a new {@link KuduSession} instance
*/
public KuduSession newKuduSession() {
synchronized (isInFlightSync) {
Preconditions.checkState(isInFlight);
}
return new KuduSession(new AsyncKuduSession(client, txnId));
}
/**
* Commit the multi-row distributed transaction represented by this handle.
*
* @param wait whether to wait for the transaction's commit phase to finalize.
* If {@code true}, this method blocks until the commit is
* finalized, otherwise it starts committing the transaction and
* returns. In the latter case, it's possible to check for the
* transaction status using the
* {@link KuduTransaction#isCommitComplete} method.
* @throws KuduException if something went wrong
*/
public void commit(boolean wait) throws KuduException {
Preconditions.checkState(isInFlight);
// TODO(aserbin): implement
// Once everything else is completed successfully, mark the transaction as
// no longer in flight.
synchronized (isInFlightSync) {
isInFlight = false;
}
}
/**
* Check whether the commit phase for a transaction is complete.
*
* @return {@code true} if transaction has finalized, otherwise {@code false}
* @throws KuduException if an error happens while querying the system about
* the state of the transaction
*/
public boolean isCommitComplete() throws KuduException {
Preconditions.checkState(!isInFlight);
// TODO(aserbin): implement
return false;
}
/**
* Rollback the multi-row distributed transaction represented by this object.
* <p>
* This method initiates rolling back the transaction and returns right after
* that. The system takes care of the rest. Once the control returns and
* no exception is thrown, a client have a guarantee that all write
* operations issued in the context of this transaction cannot be seen seen
* outside.
*
* @throws KuduException if something went wrong
*/
public void rollback() throws KuduException {
Preconditions.checkState(isInFlight, "transaction is not open for this handle");
// TODO(aserbin): implement
// Once everything else is completed successfully, mark the transaction as
// no longer in flight.
synchronized (isInFlightSync) {
isInFlight = false;
}
}
/**
* Export information on the underlying transaction in a serialized form.
* <p>
* This method transforms this handle into its serialized representation.
* <p>
* The serialized information on a Kudu transaction can be passed among
* different Kudu clients running at multiple nodes, so those separate
* Kudu clients can perform operations to be a part of the same distributed
* transaction. The resulting string is referred as "transaction token" and
* it can be deserialized into a transaction handle (i.e. an object of this
* class) via the {@link KuduTransaction#deserialize} method.
* <p>
* This method doesn't perform any RPC under the hood.
* <p>
* The representation of the data in the serialized form (i.e. the format of
* a Kudu transaction token) is an implementation detail, not a part of the
* public API and can be changed without notice.
*
* @return the serialized form of this transaction handle
* @throws IOException if serialization fails
*/
public byte[] serialize(SerializationOptions options) throws IOException {
LOG.debug("serializing handle for transaction ID {}", txnId);
Preconditions.checkState(
txnId != AsyncKuduClient.INVALID_TXN_ID,
"invalid transaction handle");
TxnTokenPB.Builder b = TxnTokenPB.newBuilder();
b.setTxnId(txnId);
b.setEnableKeepalive(options.isKeepaliveEnabled());
b.setKeepaliveMillis(keepaliveMillis);
TxnTokenPB message = b.build();
byte[] buf = new byte[message.getSerializedSize()];
CodedOutputStream cos = CodedOutputStream.newInstance(buf);
message.writeTo(cos);
cos.flush();
return buf;
}
/**
* A shortcut for the {@link KuduTransaction#serialize(SerializationOptions)}
* method invoked with default-constructed {@link SerializationOptions}.
*/
public byte[] serialize() throws IOException {
return serialize(defaultSerializationOptions);
}
/**
* Re-create KuduTransaction object given its serialized representation.
* <p>
* This method doesn't perform any RPC under the hood. The newly created
* object automatically does or does not send keep-alive messages depending
* on the {@link SerializationOptions#isKeepaliveEnabled} setting when
* the original {@link KuduTransaction} object was serialized using
* {@link KuduTransaction#serialize} method.
* <p>
* @param client Client instance to bound the result object to
* @param buf serialized representation of a {@link KuduTransaction} object
* @return Operation result status.
* @throws IOException if deserialization fails
*/
public static KuduTransaction deserialize(
byte[] buf, AsyncKuduClient client) throws IOException {
TxnTokenPB pb = TxnTokenPB.parseFrom(CodedInputStream.newInstance(buf));
final long txnId = pb.getTxnId();
final int keepaliveMillis = pb.getKeepaliveMillis();
final boolean keepaliveEnabled =
pb.hasEnableKeepalive() && pb.getEnableKeepalive();
return new KuduTransaction(client, txnId, keepaliveMillis, keepaliveEnabled);
}
/**
* Stop keepalive heartbeating, if any was in progress for this transaction
* handle.
* <p>
* This method is called automatically when the object goes out of scope
* as prescribed for {@link AutoCloseable}.
* <p>
* This method doesn't throw according to the recommendations for
* {@link AutoCloseable#close}. In case of an error, this method just logs
* the corresponding error message.
*/
@Override
public void close() {
try {
if (keepaliveEnabled) {
LOG.debug("stopping keepalive heartbeating for transaction ID {}", txnId);
// TODO(aserbin): stop sending keepalive heartbeats to TxnManager
}
} catch (Exception e) {
LOG.error("exception while automatically rolling back a transaction", e);
}
}
}