Copying old classes to provide backward compatibility with TransactionEdit
Create a v3 TxLogReader and deprecate v2 which contains logs written with old cask classes
Fix BalanceBooks example run command
Adding Deprecated annotations - addressing PR comments
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
new file mode 100644
index 0000000..95b07d7
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEdit.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.io.Writable;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionLog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Represents a transaction state change in the {@link TransactionLog}.
+ * This class was included for backward compatibility reasons. It will be removed in future releases.
+ */
+@Deprecated
+public class TransactionEdit implements Writable {
+
+ /**
+ * The possible state changes for a transaction.
+ */
+ public enum State {
+ INPROGRESS, COMMITTING, COMMITTED, INVALID, ABORTED, MOVE_WATERMARK, TRUNCATE_INVALID_TX, CHECKPOINT
+ }
+
+ private long writePointer;
+
+ /**
+ * stores the value of visibility upper bound
+ * (see {@link TransactionManager.InProgressTx#getVisibilityUpperBound()})
+ * for edit of {@link State#INPROGRESS} only
+ */
+ private long visibilityUpperBound;
+ private long commitPointer;
+ private long expirationDate;
+ private State state;
+ private Set<ChangeId> changes;
+ /** Whether or not the COMMITTED change should be fully committed. */
+ private boolean canCommit;
+ private TransactionType type;
+ private Set<Long> truncateInvalidTx;
+ private long truncateInvalidTxTime;
+ private long parentWritePointer;
+ private long[] checkpointPointers;
+
+ // for Writable
+ public TransactionEdit() {
+ this.changes = Sets.newHashSet();
+ this.truncateInvalidTx = Sets.newHashSet();
+ }
+
+ // package private for testing
+ TransactionEdit(long writePointer, long visibilityUpperBound, State state, long expirationDate,
+ Set<ChangeId> changes, long commitPointer, boolean canCommit, TransactionType type,
+ Set<Long> truncateInvalidTx, long truncateInvalidTxTime, long parentWritePointer,
+ long[] checkpointPointers) {
+ this.writePointer = writePointer;
+ this.visibilityUpperBound = visibilityUpperBound;
+ this.state = state;
+ this.expirationDate = expirationDate;
+ this.changes = changes != null ? changes : Collections.<ChangeId>emptySet();
+ this.commitPointer = commitPointer;
+ this.canCommit = canCommit;
+ this.type = type;
+ this.truncateInvalidTx = truncateInvalidTx != null ? truncateInvalidTx : Collections.<Long>emptySet();
+ this.truncateInvalidTxTime = truncateInvalidTxTime;
+ this.parentWritePointer = parentWritePointer;
+ this.checkpointPointers = checkpointPointers;
+ }
+
+ /**
+ * Returns the transaction write pointer assigned for the state change.
+ */
+ public long getWritePointer() {
+ return writePointer;
+ }
+
+ void setWritePointer(long writePointer) {
+ this.writePointer = writePointer;
+ }
+
+ public long getVisibilityUpperBound() {
+ return visibilityUpperBound;
+ }
+
+ void setVisibilityUpperBound(long visibilityUpperBound) {
+ this.visibilityUpperBound = visibilityUpperBound;
+ }
+
+ /**
+ * Returns the type of state change represented.
+ */
+ public State getState() {
+ return state;
+ }
+
+ void setState(State state) {
+ this.state = state;
+ }
+
+ /**
+ * Returns any expiration timestamp (in milliseconds) associated with the state change. This should only
+ * be populated for changes of type {@link State#INPROGRESS}.
+ */
+ public long getExpiration() {
+ return expirationDate;
+ }
+
+ void setExpiration(long expirationDate) {
+ this.expirationDate = expirationDate;
+ }
+
+ /**
+ * @return the set of changed row keys associated with the state change. This is only populated for edits
+ * of type {@link State#COMMITTING} or {@link State#COMMITTED}.
+ */
+ public Set<ChangeId> getChanges() {
+ return changes;
+ }
+
+ void setChanges(Set<ChangeId> changes) {
+ this.changes = changes;
+ }
+
+ /**
+ * Returns the write pointer used to commit the row key change set. This is only populated for edits of type
+ * {@link State#COMMITTED}.
+ */
+ public long getCommitPointer() {
+ return commitPointer;
+ }
+
+ void setCommitPointer(long commitPointer) {
+ this.commitPointer = commitPointer;
+ }
+
+ /**
+ * Returns whether or not the transaction should be moved to the committed set. This is only populated for edits
+ * of type {@link State#COMMITTED}.
+ */
+ public boolean getCanCommit() {
+ return canCommit;
+ }
+
+ void setCanCommit(boolean canCommit) {
+ this.canCommit = canCommit;
+ }
+
+ /**
+ * Returns the transaction type. This is only populated for edits of type {@link State#INPROGRESS} or
+ * {@link State#ABORTED}.
+ */
+ public TransactionType getType() {
+ return type;
+ }
+
+ void setType(TransactionType type) {
+ this.type = type;
+ }
+
+ /**
+ * Returns the transaction ids to be removed from invalid transaction list. This is only populated for
+ * edits of type {@link State#TRUNCATE_INVALID_TX}
+ */
+ public Set<Long> getTruncateInvalidTx() {
+ return truncateInvalidTx;
+ }
+
+ void setTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+ this.truncateInvalidTx = truncateInvalidTx;
+ }
+
+ /**
+ * Returns the time until which the invalid transactions need to be truncated from invalid transaction list.
+ * This is only populated for edits of type {@link State#TRUNCATE_INVALID_TX}
+ */
+ public long getTruncateInvalidTxTime() {
+ return truncateInvalidTxTime;
+ }
+
+ void setTruncateInvalidTxTime(long truncateInvalidTxTime) {
+ this.truncateInvalidTxTime = truncateInvalidTxTime;
+ }
+
+ /**
+ * Returns the parent write pointer for a checkpoint operation. This is only populated for edits of type
+ * {@link State#CHECKPOINT}
+ */
+ public long getParentWritePointer() {
+ return parentWritePointer;
+ }
+
+ void setParentWritePointer(long parentWritePointer) {
+ this.parentWritePointer = parentWritePointer;
+ }
+
+ /**
+ * Returns the checkpoint write pointers for the edit. This is only populated for edits of type
+ * {@link State#ABORTED}.
+ */
+ public long[] getCheckpointPointers() {
+ return checkpointPointers;
+ }
+
+ void setCheckpointPointers(long[] checkpointPointers) {
+ this.checkpointPointers = checkpointPointers;
+ }
+
+ /**
+ * Creates a new instance in the {@link State#INPROGRESS} state.
+ */
+ public static TransactionEdit createStarted(long writePointer, long visibilityUpperBound,
+ long expirationDate, TransactionType type) {
+ return new TransactionEdit(writePointer, visibilityUpperBound, State.INPROGRESS,
+ expirationDate, null, 0L, false, type, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#COMMITTING} state.
+ */
+ public static TransactionEdit createCommitting(long writePointer, Set<ChangeId> changes) {
+ return new TransactionEdit(writePointer, 0L, State.COMMITTING, 0L, changes, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#COMMITTED} state.
+ */
+ public static TransactionEdit createCommitted(long writePointer, Set<ChangeId> changes, long nextWritePointer,
+ boolean canCommit) {
+ return new TransactionEdit(writePointer, 0L, State.COMMITTED, 0L, changes, nextWritePointer, canCommit, null,
+ null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#ABORTED} state.
+ */
+ public static TransactionEdit createAborted(long writePointer, TransactionType type, long[] checkpointPointers) {
+ return new TransactionEdit(writePointer, 0L, State.ABORTED, 0L, null, 0L, false, type, null, 0L, 0L,
+ checkpointPointers);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#INVALID} state.
+ */
+ public static TransactionEdit createInvalid(long writePointer) {
+ return new TransactionEdit(writePointer, 0L, State.INVALID, 0L, null, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#MOVE_WATERMARK} state.
+ */
+ public static TransactionEdit createMoveWatermark(long writePointer) {
+ return new TransactionEdit(writePointer, 0L, State.MOVE_WATERMARK, 0L, null, 0L, false, null, null, 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+ */
+ public static TransactionEdit createTruncateInvalidTx(Set<Long> truncateInvalidTx) {
+ return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, truncateInvalidTx,
+ 0L, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#TRUNCATE_INVALID_TX} state.
+ */
+ public static TransactionEdit createTruncateInvalidTxBefore(long truncateInvalidTxTime) {
+ return new TransactionEdit(0L, 0L, State.TRUNCATE_INVALID_TX, 0L, null, 0L, false, null, null,
+ truncateInvalidTxTime, 0L, null);
+ }
+
+ /**
+ * Creates a new instance in the {@link State#CHECKPOINT} state.
+ */
+ public static TransactionEdit createCheckpoint(long writePointer, long parentWritePointer) {
+ return new TransactionEdit(writePointer, 0L, State.CHECKPOINT, 0L, null, 0L, false, null, null, 0L,
+ parentWritePointer, null);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ TransactionEditCodecs.encode(this, out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ TransactionEditCodecs.decode(this, in);
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TransactionEdit)) {
+ return false;
+ }
+
+ TransactionEdit that = (TransactionEdit) o;
+
+ return Objects.equal(this.writePointer, that.writePointer) &&
+ Objects.equal(this.visibilityUpperBound, that.visibilityUpperBound) &&
+ Objects.equal(this.commitPointer, that.commitPointer) &&
+ Objects.equal(this.expirationDate, that.expirationDate) &&
+ Objects.equal(this.state, that.state) &&
+ Objects.equal(this.changes, that.changes) &&
+ Objects.equal(this.canCommit, that.canCommit) &&
+ Objects.equal(this.type, that.type) &&
+ Objects.equal(this.truncateInvalidTx, that.truncateInvalidTx) &&
+ Objects.equal(this.truncateInvalidTxTime, that.truncateInvalidTxTime) &&
+ Objects.equal(this.parentWritePointer, that.parentWritePointer) &&
+ Arrays.equals(this.checkpointPointers, that.checkpointPointers);
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(writePointer, visibilityUpperBound, commitPointer, expirationDate, state, changes,
+ canCommit, type, parentWritePointer, checkpointPointers);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("writePointer", writePointer)
+ .add("visibilityUpperBound", visibilityUpperBound)
+ .add("commitPointer", commitPointer)
+ .add("expiration", expirationDate)
+ .add("state", state)
+ .add("changesSize", changes != null ? changes.size() : 0)
+ .add("canCommit", canCommit)
+ .add("type", type)
+ .add("truncateInvalidTx", truncateInvalidTx)
+ .add("truncateInvalidTxTime", truncateInvalidTxTime)
+ .add("parentWritePointer", parentWritePointer)
+ .add("checkpointPointers", checkpointPointers)
+ .toString();
+ }
+
+}
+
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
new file mode 100644
index 0000000..8bea70f
--- /dev/null
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/TransactionEditCodecs.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package co.cask.tephra.persist;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionType;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Utilities to handle encoding and decoding of {@link TransactionEdit} entries, while maintaining compatibility
+ * with older versions of the serialized data. This class was included for backward compatibility reasons.
+ * It will be removed in future releases.
+ */
+@Deprecated
+public class TransactionEditCodecs {
+
+ private static final TransactionEditCodec[] ALL_CODECS = {
+ new TransactionEditCodecV1(),
+ new TransactionEditCodecV2(),
+ new TransactionEditCodecV3(),
+ new TransactionEditCodecV4()
+ };
+
+ private static final SortedMap<Byte, TransactionEditCodec> CODECS = new TreeMap<>();
+ static {
+ for (TransactionEditCodec codec : ALL_CODECS) {
+ CODECS.put(codec.getVersion(), codec);
+ }
+ }
+
+ /**
+ * Deserializes the encoded data from the given input stream, setting the values as fields
+ * on the given {@code TransactionEdit} instances. This method expects first value in the
+ * {code DataInput} to be a byte representing the codec version used to serialize the instance.
+ *
+ * @param dest the transaction edit to populate with the deserialized values
+ * @param in the input stream containing the encoded data
+ * @throws IOException if an error occurs while deserializing from the input stream
+ */
+ public static void decode(TransactionEdit dest, DataInput in) throws IOException {
+ byte version = in.readByte();
+ TransactionEditCodec codec = CODECS.get(version);
+ if (codec == null) {
+ throw new IOException("TransactionEdit was serialized with an unknown codec version " + version +
+ ". Was it written with a newer version of Tephra?");
+ }
+ codec.decode(dest, in);
+ }
+
+ /**
+ * Serializes the given {@code TransactionEdit} instance with the latest available codec.
+ * This will first write out the version of the codec used to serialize the instance so that
+ * the correct codec can be used when calling {@link #decode(TransactionEdit, DataInput)}.
+ *
+ * @param src the transaction edit to serialize
+ * @param out the output stream to contain the data
+ * @throws IOException if an error occurs while serializing to the output stream
+ */
+ public static void encode(TransactionEdit src, DataOutput out) throws IOException {
+ TransactionEditCodec latestCodec = CODECS.get(CODECS.firstKey());
+ out.writeByte(latestCodec.getVersion());
+ latestCodec.encode(src, out);
+ }
+
+ /**
+ * Encodes the given transaction edit using a specific codec. Note that this is only exposed
+ * for use by tests.
+ */
+ @VisibleForTesting
+ static void encode(TransactionEdit src, DataOutput out, TransactionEditCodec codec) throws IOException {
+ out.writeByte(codec.getVersion());
+ codec.encode(src, out);
+ }
+
+ /**
+ * Defines the interface used for encoding and decoding {@link TransactionEdit} instances to and from
+ * binary representations.
+ */
+ interface TransactionEditCodec {
+ /**
+ * Reads the encoded values from the data input stream and sets the fields in the given {@code TransactionEdit}
+ * instance.
+ *
+ * @param dest the instance on which to set all the deserialized values
+ * @param in the input stream containing the serialized data
+ * @throws IOException if an error occurs while deserializing the data
+ */
+ void decode(TransactionEdit dest, DataInput in) throws IOException;
+
+ /**
+ * Writes all the field values from the {@code TransactionEdit} instance in serialized form to the data
+ * output stream.
+ *
+ * @param src the instance to serialize to the stream
+ * @param out the output stream to contain the data
+ * @throws IOException if an error occurs while serializing the data
+ */
+ void encode(TransactionEdit src, DataOutput out) throws IOException;
+
+ /**
+ * Returns the version number for this codec. Each codec should use a unique version number, with the newest
+ * codec having the lowest number.
+ */
+ byte getVersion();
+ }
+
+
+ // package-private for unit-test access
+ static class TransactionEditCodecV1 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ dest.setWritePointer(in.readLong());
+ int stateIdx = in.readInt();
+ try {
+ dest.setState(TransactionEdit.State.values()[stateIdx]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("State enum ordinal value is out of range: " + stateIdx);
+ }
+ dest.setExpiration(in.readLong());
+ dest.setCommitPointer(in.readLong());
+ dest.setCanCommit(in.readBoolean());
+ int changeSize = in.readInt();
+ Set<ChangeId> changes = Sets.newHashSet();
+ for (int i = 0; i < changeSize; i++) {
+ int currentLength = in.readInt();
+ byte[] currentBytes = new byte[currentLength];
+ in.readFully(currentBytes);
+ changes.add(new ChangeId(currentBytes));
+ }
+ dest.setChanges(changes);
+ // 1st version did not store this info. It is safe to set firstInProgress to 0, it may decrease performance until
+ // this tx is finished, but correctness will be preserved.
+ dest.setVisibilityUpperBound(0);
+ }
+
+ /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+ * unit-tests only */
+ @Override
+ @Deprecated
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ out.writeLong(src.getWritePointer());
+ // use ordinal for predictable size, though this does not support evolution
+ out.writeInt(src.getState().ordinal());
+ out.writeLong(src.getExpiration());
+ out.writeLong(src.getCommitPointer());
+ out.writeBoolean(src.getCanCommit());
+ Set<ChangeId> changes = src.getChanges();
+ if (changes == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(changes.size());
+ for (ChangeId c : changes) {
+ byte[] cKey = c.getKey();
+ out.writeInt(cKey.length);
+ out.write(cKey);
+ }
+ }
+ // NOTE: we didn't have visibilityUpperBound in V1, it was added in V2
+ // we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+ // it was added in V3
+ }
+
+ @Override
+ public byte getVersion() {
+ return -1;
+ }
+ }
+
+ // package-private for unit-test access
+ static class TransactionEditCodecV2 extends TransactionEditCodecV1 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ dest.setVisibilityUpperBound(in.readLong());
+ }
+
+ /** @deprecated use {@link TransactionEditCodecs.TransactionEditCodecV4} instead, it is still here for
+ * unit-tests only */
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ out.writeLong(src.getVisibilityUpperBound());
+ // NOTE: we didn't have transaction type, truncateInvalidTx and truncateInvalidTxTime in V1 and V2,
+ // it was added in V3
+ }
+
+ @Override
+ public byte getVersion() {
+ return -2;
+ }
+ }
+
+ // TODO: refactor to avoid duplicate code among different version of codecs
+ // package-private for unit-test access
+ static class TransactionEditCodecV3 extends TransactionEditCodecV2 implements TransactionEditCodec {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ int typeIdx = in.readInt();
+ // null transaction type is represented as -1
+ if (typeIdx < 0) {
+ dest.setType(null);
+ } else {
+ try {
+ dest.setType(TransactionType.values()[typeIdx]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ throw new IOException("Type enum ordinal value is out of range: " + typeIdx);
+ }
+ }
+
+ int truncateInvalidTxSize = in.readInt();
+ Set<Long> truncateInvalidTx = emptySet(dest.getTruncateInvalidTx());
+ for (int i = 0; i < truncateInvalidTxSize; i++) {
+ truncateInvalidTx.add(in.readLong());
+ }
+ dest.setTruncateInvalidTx(truncateInvalidTx);
+ dest.setTruncateInvalidTxTime(in.readLong());
+ }
+
+ private <T> Set<T> emptySet(Set<T> set) {
+ if (set == null) {
+ return Sets.newHashSet();
+ }
+ set.clear();
+ return set;
+ }
+
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ // null transaction type is represented as -1
+ if (src.getType() == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(src.getType().ordinal());
+ }
+
+ Set<Long> truncateInvalidTx = src.getTruncateInvalidTx();
+ if (truncateInvalidTx == null) {
+ out.writeInt(0);
+ } else {
+ out.writeInt(truncateInvalidTx.size());
+ for (long id : truncateInvalidTx) {
+ out.writeLong(id);
+ }
+ }
+ out.writeLong(src.getTruncateInvalidTxTime());
+ }
+
+ @Override
+ public byte getVersion() {
+ return -3;
+ }
+ }
+
+ static class TransactionEditCodecV4 extends TransactionEditCodecV3 {
+ @Override
+ public void decode(TransactionEdit dest, DataInput in) throws IOException {
+ super.decode(dest, in);
+ dest.setParentWritePointer(in.readLong());
+ int checkpointPointersLen = in.readInt();
+ if (checkpointPointersLen >= 0) {
+ long[] checkpointPointers = new long[checkpointPointersLen];
+ for (int i = 0; i < checkpointPointersLen; i++) {
+ checkpointPointers[i] = in.readLong();
+ }
+ dest.setCheckpointPointers(checkpointPointers);
+ }
+ }
+
+ @Override
+ public void encode(TransactionEdit src, DataOutput out) throws IOException {
+ super.encode(src, out);
+ out.writeLong(src.getParentWritePointer());
+ long[] checkpointPointers = src.getCheckpointPointers();
+ if (checkpointPointers == null) {
+ out.writeInt(-1);
+ } else {
+ out.writeInt(checkpointPointers.length);
+ for (int i = 0; i < checkpointPointers.length; i++) {
+ out.writeLong(checkpointPointers[i]);
+ }
+ }
+ }
+
+ @Override
+ public byte getVersion() {
+ return -4;
+ }
+ }
+}
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 61ee3cc..1c4fafc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -345,7 +345,7 @@
*/
public static final String NUM_ENTRIES_APPENDED = "count";
public static final String VERSION_KEY = "version";
- public static final byte CURRENT_VERSION = 2;
+ public static final byte CURRENT_VERSION = 3;
}
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
index b1e0978..cf97c92 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
@@ -18,6 +18,7 @@
package org.apache.tephra.persist;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -248,4 +249,44 @@
this.edit.readFields(in);
}
}
+
+ // package private for testing
+ @Deprecated
+ @VisibleForTesting
+ static class CaskEntry implements Writable {
+ private LongWritable key;
+ private co.cask.tephra.persist.TransactionEdit edit;
+
+
+ // for Writable
+ public CaskEntry() {
+ this.key = new LongWritable();
+ this.edit = new co.cask.tephra.persist.TransactionEdit();
+ }
+
+ public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
+ this.key = key;
+ this.edit = edit;
+ }
+
+ public LongWritable getKey() {
+ return this.key;
+ }
+
+ public co.cask.tephra.persist.TransactionEdit getEdit() {
+ return this.edit;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ this.key.write(out);
+ this.edit.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.key.readFields(in);
+ this.edit.readFields(in);
+ }
+ }
}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
index a517903..1bddc31 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderSupplier.java
@@ -45,6 +45,9 @@
}
switch (version) {
+ case 3:
+ logReader = new HDFSTransactionLogReaderV3(reader);
+ return logReader;
case 2:
logReader = new HDFSTransactionLogReaderV2(reader);
return logReader;
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
index faefaec..38b74d8 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV1.java
@@ -53,8 +53,9 @@
}
try {
- boolean successful = reader.next(key, reuse);
- return successful ? reuse : null;
+ co.cask.tephra.persist.TransactionEdit oldTxEdit = new co.cask.tephra.persist.TransactionEdit();
+ boolean successful = reader.next(key, oldTxEdit);
+ return successful ? TransactionEdit.convertCaskTxEdit(oldTxEdit) : null;
} catch (EOFException e) {
LOG.warn("Hit an unexpected EOF while trying to read the Transaction Edit. Skipping the entry.", e);
return null;
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
index ce50da8..e371b98 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV2.java
@@ -38,7 +38,7 @@
private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
private final SequenceFile.Reader reader;
- private final Queue<TransactionEdit> transactionEdits;
+ private final Queue<co.cask.tephra.persist.TransactionEdit> transactionEdits;
private final CommitMarkerCodec commitMarkerCodec;
private final LongWritable key;
@@ -76,12 +76,12 @@
}
if (!transactionEdits.isEmpty()) {
- return transactionEdits.remove();
+ return TransactionEdit.convertCaskTxEdit(transactionEdits.remove());
}
// Fetch the 'marker' and read 'marker' number of edits
populateTransactionEdits();
- return transactionEdits.poll();
+ return TransactionEdit.convertCaskTxEdit(transactionEdits.poll());
}
private void populateTransactionEdits() throws IOException {
@@ -96,7 +96,7 @@
}
for (int i = 0; i < numEntries; i++) {
- TransactionEdit edit = new TransactionEdit();
+ co.cask.tephra.persist.TransactionEdit edit = new co.cask.tephra.persist.TransactionEdit();
try {
if (reader.next(key, edit)) {
transactionEdits.add(edit);
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
new file mode 100644
index 0000000..3670e3f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/HDFSTransactionLogReaderV3.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.persist;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * {@link TransactionLogReader} that can read v3 version of Transaction logs. The logs are expected to
+ * have commit markers that indicates the size of the batch of {@link TransactionEdit}s (follows the marker),
+ * that were synced together. If the expected number of {@link TransactionEdit}s are not present then that set of
+ * {@link TransactionEdit}s are discarded.
+ */
+public class HDFSTransactionLogReaderV3 implements TransactionLogReader {
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSTransactionLogReaderV2.class);
+
+ private final SequenceFile.Reader reader;
+ private final Queue<TransactionEdit> transactionEdits;
+ private final CommitMarkerCodec commitMarkerCodec;
+ private final LongWritable key;
+
+ private boolean closed;
+
+ public HDFSTransactionLogReaderV3(SequenceFile.Reader reader) {
+ this.reader = reader;
+ this.transactionEdits = new ArrayDeque<>();
+ this.key = new LongWritable();
+ this.commitMarkerCodec = new CommitMarkerCodec();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ try {
+ commitMarkerCodec.close();
+ } finally {
+ reader.close();
+ closed = true;
+ }
+ }
+
+ @Override
+ public TransactionEdit next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public TransactionEdit next(TransactionEdit reuse) throws IOException {
+ if (closed) {
+ return null;
+ }
+
+ if (!transactionEdits.isEmpty()) {
+ return transactionEdits.remove();
+ }
+
+ // Fetch the 'marker' and read 'marker' number of edits
+ populateTransactionEdits();
+ return transactionEdits.poll();
+ }
+
+ private void populateTransactionEdits() throws IOException {
+ // read the marker to determine numEntries to read.
+ int numEntries = 0;
+ try {
+ // can throw EOFException if reading of incomplete commit marker, no other action required since we can safely
+ // ignore this
+ numEntries = commitMarkerCodec.readMarker(reader);
+ } catch (EOFException e) {
+ LOG.warn("Reached EOF in log while trying to read commit marker", e);
+ }
+
+ for (int i = 0; i < numEntries; i++) {
+ TransactionEdit edit = new TransactionEdit();
+ try {
+ if (reader.next(key, edit)) {
+ transactionEdits.add(edit);
+ } else {
+ throw new EOFException("Attempt to read TransactionEdit failed.");
+ }
+ } catch (EOFException e) {
+ // we have reached EOF before reading back numEntries, we clear the partial list and return.
+ LOG.warn("Reached EOF in log before reading {} entries. Ignoring all {} edits since the last marker",
+ numEntries, transactionEdits.size(), e);
+ transactionEdits.clear();
+ }
+ }
+ }
+}
diff --git a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
index 1d07e72..8702b5b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
+++ b/tephra-core/src/main/java/org/apache/tephra/persist/TransactionEdit.java
@@ -298,6 +298,21 @@
parentWritePointer, null);
}
+ /**
+ * Creates a new instance from {@link co.cask.tephra.persist.TransactionEdit}.
+ */
+ @Deprecated
+ public static TransactionEdit convertCaskTxEdit(co.cask.tephra.persist.TransactionEdit txEdit) {
+ if (txEdit == null) {
+ return null;
+ }
+ return new TransactionEdit(txEdit.getWritePointer(), txEdit.getVisibilityUpperBound(),
+ TransactionEdit.State.valueOf(txEdit.getState().toString()), txEdit.getExpiration(),
+ txEdit.getChanges(), txEdit.getCommitPointer(), txEdit.getCanCommit(), txEdit.getType(),
+ txEdit.getTruncateInvalidTx(), txEdit.getTruncateInvalidTxTime(),
+ txEdit.getParentWritePointer(), txEdit.getCheckpointPointers());
+ }
+
@Override
public void write(DataOutput out) throws IOException {
TransactionEditCodecs.encode(this, out);
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
index 7b9f06b..7a34e55 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/HDFSTransactionLogTest.java
@@ -86,16 +86,26 @@
}
private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
- long timeInMillis, boolean withMarker) throws IOException {
+ long timeInMillis, byte versionNumber) throws IOException {
String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
- if (withMarker) {
+ if (versionNumber > 1) {
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
- new Text(Byte.toString(TxConstants.TransactionLog.CURRENT_VERSION)));
+ new Text(Byte.toString(versionNumber)));
}
- return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
- TransactionEdit.class, SequenceFile.CompressionType.NONE, null, null, metadata);
+
+ switch (versionNumber) {
+ case 1:
+ case 2:
+ return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+ co.cask.tephra.persist.TransactionEdit.class,
+ SequenceFile.CompressionType.NONE, null, null, metadata);
+ default:
+ return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
+ TransactionEdit.class, SequenceFile.CompressionType.NONE,
+ null, null, metadata);
+ }
}
private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
@@ -103,21 +113,92 @@
CommitMarkerCodec.writeMarker(writer, size);
}
- private void testTransactionLogSync(int totalCount, int batchSize, boolean withMarker, boolean isComplete)
+ private void testCaskTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
+ throws Exception {
+ List<co.cask.tephra.persist.TransactionEdit> edits = TransactionEditUtil.createRandomCaskEdits(totalCount);
+ long timestamp = System.currentTimeMillis();
+ Configuration configuration = getConfiguration();
+ FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
+ SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
+ AtomicLong logSequence = new AtomicLong();
+ HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
+ AbstractTransactionLog.CaskEntry entry;
+
+ for (int i = 0; i < totalCount - batchSize; i += batchSize) {
+ if (versionNumber > 1) {
+ writeNumWrites(writer, batchSize);
+ }
+ for (int j = 0; j < batchSize; j++) {
+ entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
+ writer.append(entry.getKey(), entry.getEdit());
+ }
+ writer.syncFs();
+ }
+
+ if (versionNumber > 1) {
+ writeNumWrites(writer, batchSize);
+ }
+
+ for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
+ entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
+ writer.append(entry.getKey(), entry.getEdit());
+ }
+
+ entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()),
+ edits.get(totalCount - 1));
+ if (isComplete) {
+ writer.append(entry.getKey(), entry.getEdit());
+ } else {
+ byte[] bytes = Longs.toByteArray(entry.getKey().get());
+ writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
+ @Override
+ public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
+ byte[] test = new byte[]{0x2};
+ outStream.write(test, 0, 1);
+ }
+
+ @Override
+ public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
+ // no-op
+ }
+
+ @Override
+ public int getSize() {
+ // mimic size longer than the actual byte array size written, so we would reach EOF
+ return 12;
+ }
+ });
+ }
+ writer.syncFs();
+ Closeables.closeQuietly(writer);
+
+ // now let's try to read this log
+ TransactionLogReader reader = transactionLog.getReader();
+ int syncedEdits = 0;
+ while (reader.next() != null) {
+ // testing reading the transaction edits
+ syncedEdits++;
+ }
+ if (isComplete) {
+ Assert.assertEquals(totalCount, syncedEdits);
+ } else {
+ Assert.assertEquals(totalCount - batchSize, syncedEdits);
+ }
+ }
+
+ private void testTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
throws Exception {
List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
long timestamp = System.currentTimeMillis();
Configuration configuration = getConfiguration();
FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
- SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, withMarker);
+ SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
AtomicLong logSequence = new AtomicLong();
HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
AbstractTransactionLog.Entry entry;
for (int i = 0; i < totalCount - batchSize; i += batchSize) {
- if (withMarker) {
- writeNumWrites(writer, batchSize);
- }
+ writeNumWrites(writer, batchSize);
for (int j = 0; j < batchSize; j++) {
entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
writer.append(entry.getKey(), entry.getEdit());
@@ -125,10 +206,7 @@
writer.syncFs();
}
- if (withMarker) {
- writeNumWrites(writer, batchSize);
- }
-
+ writeNumWrites(writer, batchSize);
for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
writer.append(entry.getKey(), entry.getEdit());
@@ -177,22 +255,33 @@
}
@Test
- public void testTransactionLogNewVersion() throws Exception {
+ public void testTransactionLogVersion3() throws Exception {
// in-complete sync
- testTransactionLogSync(1000, 1, true, false);
- testTransactionLogSync(2000, 5, true, false);
+ testTransactionLogSync(1000, 1, (byte) 3, false);
+ testTransactionLogSync(2000, 5, (byte) 3, false);
// complete sync
- testTransactionLogSync(1000, 1, true, true);
- testTransactionLogSync(2000, 5, true, true);
+ testTransactionLogSync(1000, 1, (byte) 3, true);
+ testTransactionLogSync(2000, 5, (byte) 3, true);
+ }
+
+ @Test
+ public void testTransactionLogVersion2() throws Exception {
+ // in-complete sync
+ testCaskTransactionLogSync(1000, 1, (byte) 2, false);
+ testCaskTransactionLogSync(2000, 5, (byte) 2, false);
+
+ // complete sync
+ testCaskTransactionLogSync(1000, 1, (byte) 2, true);
+ testCaskTransactionLogSync(2000, 5, (byte) 2, true);
}
@Test
public void testTransactionLogOldVersion() throws Exception {
// in-complete sync
- testTransactionLogSync(1000, 1, false, false);
+ testCaskTransactionLogSync(1000, 1, (byte) 1, false);
// complete sync
- testTransactionLogSync(2000, 5, false, true);
+ testCaskTransactionLogSync(2000, 5, (byte) 1, true);
}
}
diff --git a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
index 854ccdd..52e82d9 100644
--- a/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
+++ b/tephra-core/src/test/java/org/apache/tephra/util/TransactionEditUtil.java
@@ -24,6 +24,7 @@
import org.apache.tephra.TransactionType;
import org.apache.tephra.persist.TransactionEdit;
+import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -36,38 +37,53 @@
/**
* Generates a number of semi-random {@link TransactionEdit} instances.
+ */
+ public static List<TransactionEdit> createRandomEdits(int numEntries) {
+ List<co.cask.tephra.persist.TransactionEdit> caskTxEdits = createRandomCaskEdits(numEntries);
+ List<TransactionEdit> txEdits = new ArrayList<>();
+ for (co.cask.tephra.persist.TransactionEdit caskTxEdit : caskTxEdits) {
+ txEdits.add(TransactionEdit.convertCaskTxEdit(caskTxEdit));
+ }
+ return txEdits;
+ }
+
+ /**
+ * Generates a number of semi-random {@link co.cask.tephra.persist.TransactionEdit} instances.
* These are just randomly selected from the possible states, so would not necessarily reflect a real-world
* distribution.
*
* @param numEntries how many entries to generate in the returned list.
* @return a list of randomly generated transaction log edits.
*/
- public static List<TransactionEdit> createRandomEdits(int numEntries) {
- List<TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
+ @Deprecated
+ public static List<co.cask.tephra.persist.TransactionEdit> createRandomCaskEdits(int numEntries) {
+ List<co.cask.tephra.persist.TransactionEdit> edits = Lists.newArrayListWithCapacity(numEntries);
for (int i = 0; i < numEntries; i++) {
- TransactionEdit.State nextType = TransactionEdit.State.values()[random.nextInt(6)];
+ co.cask.tephra.persist.TransactionEdit.State nextType =
+ co.cask.tephra.persist.TransactionEdit.State.values()[random.nextInt(6)];
long writePointer = Math.abs(random.nextLong());
switch (nextType) {
case INPROGRESS:
edits.add(
- TransactionEdit.createStarted(writePointer, writePointer - 1,
- System.currentTimeMillis() + 300000L, TransactionType.SHORT));
+ co.cask.tephra.persist.TransactionEdit.createStarted(writePointer, writePointer - 1,
+ System.currentTimeMillis() + 300000L,
+ TransactionType.SHORT));
break;
case COMMITTING:
- edits.add(TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
+ edits.add(co.cask.tephra.persist.TransactionEdit.createCommitting(writePointer, generateChangeSet(10)));
break;
case COMMITTED:
- edits.add(TransactionEdit.createCommitted(writePointer, generateChangeSet(10), writePointer + 1,
- random.nextBoolean()));
+ edits.add(co.cask.tephra.persist.TransactionEdit.createCommitted(writePointer, generateChangeSet(10),
+ writePointer + 1, random.nextBoolean()));
break;
case INVALID:
- edits.add(TransactionEdit.createInvalid(writePointer));
+ edits.add(co.cask.tephra.persist.TransactionEdit.createInvalid(writePointer));
break;
case ABORTED:
- edits.add(TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
+ edits.add(co.cask.tephra.persist.TransactionEdit.createAborted(writePointer, TransactionType.SHORT, null));
break;
case MOVE_WATERMARK:
- edits.add(TransactionEdit.createMoveWatermark(writePointer));
+ edits.add(co.cask.tephra.persist.TransactionEdit.createMoveWatermark(writePointer));
break;
}
}
diff --git a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
index 17c1005..e191f5c 100644
--- a/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
+++ b/tephra-examples/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -68,7 +68,7 @@
* <p>
* You can run the BalanceBooks application with the following command:
* <pre>
- * ./bin/tephra run BalanceBooks [num clients] [num iterations]
+ * ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
* </pre>
* where <code>[num clients]</code> is the number of concurrent client threads to use, and
* <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.