ONLY FOR TESTING: tx log decoding tests
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
index 4d7793e..9bab3c9 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
@@ -157,6 +157,7 @@
if (closed) {
return null;
}
+ LOG.error("1111111 Position in file = {}", reader.getPosition());
boolean successful = reader.next(key, reuse);
if (successful) {
return reuse;
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
index c86d3fe..02cad69 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
@@ -111,6 +111,12 @@
}
}
+ public TransactionLog getTransactionLog(Path path, long timestamp) throws IOException {
+ Path destination = new Path(snapshotDir, LOG_FILE_PREFIX + timestamp);
+ fs.copyFromLocalFile(path, destination);
+ return new HDFSTransactionLog(fs, hConf, destination, 1443792213636L, metricsCollector);
+ }
+
@Override
protected void shutDown() throws Exception {
fs.close();
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
index b7ee86f..af80b93 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
@@ -21,10 +21,12 @@
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import co.cask.tephra.snapshot.SnapshotCodecV2;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.IOException;
@@ -69,4 +71,23 @@
protected AbstractTransactionStateStorage getStorage(Configuration conf) {
return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
}
+
+
+ @Test
+ public void testTxEdits() throws Exception {
+ HDFSTransactionStateStorage
+ hdfsTransactionStateStorage = (HDFSTransactionStateStorage) getStorage(getConfiguration("testingEdits"));
+ hdfsTransactionStateStorage.startAndWait();
+ TransactionLog transactionLog = hdfsTransactionStateStorage.getTransactionLog(
+ new Path("/Users/shankar/tx.snapshot/latest/tx.snapshot/txlog.1443792213636"), 1443792213636L);
+ TransactionLogReader reader = transactionLog.getReader();
+ TransactionEdit transactionEdit;
+ int count = 0;
+ while((transactionEdit = reader.next()) != null) {
+ System.out.println(count++);
+// if (count > 118355) {
+// System.out.println(transactionEdit);
+// }
+ }
+ }
}
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
index a70358b..e6d8aa7 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/TransactionEditTest.java
@@ -18,13 +18,22 @@
import co.cask.tephra.ChangeId;
import co.cask.tephra.TransactionType;
+import co.cask.tephra.snapshot.DefaultSnapshotCodec;
+import co.cask.tephra.snapshot.SnapshotCodecProvider;
import com.google.common.collect.Sets;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
+
import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
/**
@@ -71,6 +80,19 @@
}
@Test
+ public void decodeProblematicTxLog() throws Exception {
+ DataInput in = ByteStreams.newDataInput(
+ IOUtils.toByteArray(
+ new FileInputStream(new File("/Users/shankar/tx.snapshot/latest/tx.snapshot/snapshot.1443792213636"))));
+// TransactionEdit decodedEdit = new TransactionEdit();
+// decodedEdit.readFields(in);
+ SnapshotCodecProvider provider1 = new SnapshotCodecProvider(new Configuration());
+ provider1.decode(new FileInputStream(
+ new File("/Users/shankar/tx.snapshot/latest/tx.snapshot/snapshot.1443792213636")));
+
+ }
+
+ @Test
public void testSerialization() throws Exception {
assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[0]));
assertSerializedEdit(TransactionEdit.createAborted(1L, TransactionType.SHORT, new long[]{ 2L, 3L }));