blob: f53264bf1f2749f680b2e8cbd4847fea6d6d2eac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra.persist;
import com.google.common.io.Closeables;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.tephra.TxConstants;
import org.apache.tephra.metrics.MetricsCollector;
import org.apache.tephra.metrics.TxMetricsCollector;
import org.apache.tephra.util.TransactionEditUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* Testing for complete and partial sycs of {@link TransactionEdit} to {@link HDFSTransactionLog}
*/
public class HDFSTransactionLogTest {
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
private static final String LOG_FILE_PREFIX = "txlog.";
private static MiniDFSCluster dfsCluster;
private static Configuration conf;
private static MetricsCollector metricsCollector;
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration hConf = new Configuration();
hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TMP_FOLDER.newFolder().getAbsolutePath());
dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
conf = new Configuration(dfsCluster.getFileSystem().getConf());
metricsCollector = new TxMetricsCollector();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
dfsCluster.shutdown();
}
private Configuration getConfiguration() throws IOException {
// tests should use the current user for HDFS
conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, TMP_FOLDER.newFolder().getAbsolutePath());
return conf;
}
private HDFSTransactionLog getHDFSTransactionLog(Configuration conf,
FileSystem fs, long timeInMillis) throws Exception {
String snapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
return new HDFSTransactionLog(fs, conf, newLog, timeInMillis, metricsCollector);
}
private SequenceFile.Writer getSequenceFileWriter(Configuration configuration, FileSystem fs,
long timeInMillis, byte versionNumber) throws IOException {
String snapshotDir = configuration.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
Path newLog = new Path(snapshotDir, LOG_FILE_PREFIX + timeInMillis);
SequenceFile.Metadata metadata = new SequenceFile.Metadata();
if (versionNumber > 1) {
metadata.set(new Text(TxConstants.TransactionLog.VERSION_KEY),
new Text(Byte.toString(versionNumber)));
}
switch (versionNumber) {
case 1:
case 2:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
co.cask.tephra.persist.TransactionEdit.class,
SequenceFile.CompressionType.NONE, null, null, metadata);
default:
return SequenceFile.createWriter(fs, configuration, newLog, LongWritable.class,
TransactionEdit.class, SequenceFile.CompressionType.NONE,
null, null, metadata);
}
}
private void writeNumWrites(SequenceFile.Writer writer, final int size) throws Exception {
String key = TxConstants.TransactionLog.NUM_ENTRIES_APPENDED;
CommitMarkerCodec.writeMarker(writer, size);
}
private void testCaskTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
throws Exception {
List<co.cask.tephra.persist.TransactionEdit> edits = TransactionEditUtil.createRandomCaskEdits(totalCount);
long timestamp = System.currentTimeMillis();
Configuration configuration = getConfiguration();
FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
AtomicLong logSequence = new AtomicLong();
HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
AbstractTransactionLog.CaskEntry entry;
for (int i = 0; i < totalCount - batchSize; i += batchSize) {
if (versionNumber > 1) {
writeNumWrites(writer, batchSize);
}
for (int j = 0; j < batchSize; j++) {
entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
writer.append(entry.getKey(), entry.getEdit());
}
writer.syncFs();
}
if (versionNumber > 1) {
writeNumWrites(writer, batchSize);
}
for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
writer.append(entry.getKey(), entry.getEdit());
}
entry = new AbstractTransactionLog.CaskEntry(new LongWritable(logSequence.getAndIncrement()),
edits.get(totalCount - 1));
if (isComplete) {
writer.append(entry.getKey(), entry.getEdit());
} else {
byte[] bytes = Longs.toByteArray(entry.getKey().get());
writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
@Override
public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
byte[] test = new byte[]{0x2};
outStream.write(test, 0, 1);
}
@Override
public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
// no-op
}
@Override
public int getSize() {
// mimic size longer than the actual byte array size written, so we would reach EOF
return 12;
}
});
}
writer.syncFs();
Closeables.closeQuietly(writer);
// now let's try to read this log
TransactionLogReader reader = transactionLog.getReader();
int syncedEdits = 0;
while (reader.next() != null) {
// testing reading the transaction edits
syncedEdits++;
}
if (isComplete) {
Assert.assertEquals(totalCount, syncedEdits);
} else {
Assert.assertEquals(totalCount - batchSize, syncedEdits);
}
}
private void testTransactionLogSync(int totalCount, int batchSize, byte versionNumber, boolean isComplete)
throws Exception {
List<TransactionEdit> edits = TransactionEditUtil.createRandomEdits(totalCount);
long timestamp = System.currentTimeMillis();
Configuration configuration = getConfiguration();
configuration.set(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD, "0");
FileSystem fs = FileSystem.newInstance(FileSystem.getDefaultUri(configuration), configuration);
SequenceFile.Writer writer = getSequenceFileWriter(configuration, fs, timestamp, versionNumber);
AtomicLong logSequence = new AtomicLong();
HDFSTransactionLog transactionLog = getHDFSTransactionLog(configuration, fs, timestamp);
AbstractTransactionLog.Entry entry;
for (int i = 0; i < totalCount - batchSize; i += batchSize) {
writeNumWrites(writer, batchSize);
for (int j = 0; j < batchSize; j++) {
entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(j));
writer.append(entry.getKey(), entry.getEdit());
}
writer.syncFs();
}
writeNumWrites(writer, batchSize);
for (int i = totalCount - batchSize; i < totalCount - 1; i++) {
entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()), edits.get(i));
writer.append(entry.getKey(), entry.getEdit());
}
entry = new AbstractTransactionLog.Entry(new LongWritable(logSequence.getAndIncrement()),
edits.get(totalCount - 1));
if (isComplete) {
writer.append(entry.getKey(), entry.getEdit());
} else {
byte[] bytes = Longs.toByteArray(entry.getKey().get());
writer.appendRaw(bytes, 0, bytes.length, new SequenceFile.ValueBytes() {
@Override
public void writeUncompressedBytes(DataOutputStream outStream) throws IOException {
byte[] test = new byte[]{0x2};
outStream.write(test, 0, 1);
}
@Override
public void writeCompressedBytes(DataOutputStream outStream) throws IllegalArgumentException, IOException {
// no-op
}
@Override
public int getSize() {
// mimic size longer than the actual byte array size written, so we would reach EOF
return 12;
}
});
}
writer.syncFs();
Closeables.closeQuietly(writer);
// now let's try to read this log
TransactionLogReader reader = transactionLog.getReader();
int syncedEdits = 0;
while (reader.next() != null) {
// testing reading the transaction edits
syncedEdits++;
}
if (isComplete) {
Assert.assertEquals(totalCount, syncedEdits);
} else {
Assert.assertEquals(totalCount - batchSize, syncedEdits);
}
}
@Test
public void testTransactionLogVersion3() throws Exception {
// in-complete sync
testTransactionLogSync(1000, 1, (byte) 3, false);
testTransactionLogSync(2000, 5, (byte) 3, false);
// complete sync
testTransactionLogSync(1000, 1, (byte) 3, true);
testTransactionLogSync(2000, 5, (byte) 3, true);
}
@Test
public void testTransactionLogVersion2() throws Exception {
// in-complete sync
testCaskTransactionLogSync(1000, 1, (byte) 2, false);
testCaskTransactionLogSync(2000, 5, (byte) 2, false);
// complete sync
testCaskTransactionLogSync(1000, 1, (byte) 2, true);
testCaskTransactionLogSync(2000, 5, (byte) 2, true);
}
@Test
public void testTransactionLogOldVersion() throws Exception {
// in-complete sync
testCaskTransactionLogSync(1000, 1, (byte) 1, false);
// complete sync
testCaskTransactionLogSync(2000, 5, (byte) 1, true);
}
}