blob: 9c565bac5c9248f3a5ec6ed3bec1c8eea62e2fab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra.snapshot;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Guice;
import com.google.inject.Injector;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.ChangeId;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
import org.apache.tephra.persist.TransactionSnapshot;
import org.apache.tephra.persist.TransactionStateStorage;
import org.apache.tephra.persist.TransactionVisibilityState;
import org.apache.tephra.runtime.ConfigModule;
import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionModules;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Tests related to {@link SnapshotCodec} implementations.
*/
public class SnapshotCodecTest {
@ClassRule
public static TemporaryFolder tmpDir = new TemporaryFolder();
@Test
public void testMinimalDeserialization() throws Exception {
long now = System.currentTimeMillis();
long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
/*
* Snapshot consisting of transactions at:
*/
long tInvalid = nowWritePointer - 5; // t1 - invalid
long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
long tLong = nowWritePointer - 3; // t3 - in-progress LONG
long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
tLong, new TransactionManager.InProgressTx(readPtr,
TransactionManager.getTxExpirationFromWritePointer(
tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
TransactionManager.InProgressType.LONG),
tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT)));
TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
Lists.newArrayList(tInvalid), // invalid
inProgress, ImmutableMap.<Long, Set<ChangeId>>of(
tShort, Sets.<ChangeId>newHashSet()),
ImmutableMap.<Long, Set<ChangeId>>of(
tCommitted, Sets.<ChangeId>newHashSet()));
Configuration conf1 = new Configuration();
conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
byte[] byteArray;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
provider1.encode(out, snapshot);
byteArray = out.toByteArray();
}
// TransactionSnapshot and TransactionVisibilityState decode should pass now
TransactionSnapshot txSnapshot = provider1.decode(new ByteArrayInputStream(byteArray));
TransactionVisibilityState txVisibilityState =
provider1.decodeTransactionVisibilityState(new ByteArrayInputStream(byteArray));
assertTransactionVisibilityStateEquals(txSnapshot, txVisibilityState);
// Corrupt the serialization byte array so that full deserialization will fail
byteArray[byteArray.length - 1] = 'a';
// TransactionVisibilityState decoding should pass since it doesn't decode the committing and committed changesets.
TransactionVisibilityState txVisibilityState2 = provider1.decodeTransactionVisibilityState(
new ByteArrayInputStream(byteArray));
Assert.assertNotNull(txVisibilityState2);
Assert.assertEquals(txVisibilityState, txVisibilityState2);
Assert.assertEquals(readPtr, txVisibilityState2.getReadPointer());
try {
provider1.decode(new ByteArrayInputStream(byteArray));
Assert.fail();
} catch (RuntimeException e) {
// expected since we modified the serialization bytes
}
}
/**
* In-progress LONG transactions written with DefaultSnapshotCodec will not have the type serialized as part of
* the data. Since these transactions also contain a non-negative expiration, we need to ensure we reset the type
* correctly when the snapshot is loaded.
*/
@Test
public void testDefaultToV3Compatibility() throws Exception {
long now = System.currentTimeMillis();
long nowWritePointer = now * TxConstants.MAX_TX_PER_MS;
/*
* Snapshot consisting of transactions at:
*/
long tInvalid = nowWritePointer - 5; // t1 - invalid
long readPtr = nowWritePointer - 4; // t2 - here and earlier committed
long tLong = nowWritePointer - 3; // t3 - in-progress LONG
long tCommitted = nowWritePointer - 2; // t4 - committed, changeset (r1, r2)
long tShort = nowWritePointer - 1; // t5 - in-progress SHORT, canCommit called, changeset (r3, r4)
TreeMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(ImmutableSortedMap.of(
tLong, new TransactionManager.InProgressTx(
readPtr,
TransactionManager.getTxExpirationFromWritePointer(tLong, TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT),
TransactionManager.InProgressType.LONG),
tShort, new TransactionManager.InProgressTx(readPtr, now + 1000, TransactionManager.InProgressType.SHORT)));
TransactionSnapshot snapshot = new TransactionSnapshot(now, readPtr, nowWritePointer,
Lists.newArrayList(tInvalid), // invalid
inProgress,
ImmutableMap.<Long, Set<ChangeId>>of(
tShort, Sets.newHashSet(new ChangeId(new byte[]{'r', '3'}), new ChangeId(new byte[]{'r', '4'}))),
ImmutableMap.<Long, Set<ChangeId>>of(
tCommitted, Sets.newHashSet(new ChangeId(new byte[]{'r', '1'}), new ChangeId(new byte[]{'r', '2'}))));
Configuration conf1 = new Configuration();
conf1.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
SnapshotCodecProvider provider1 = new SnapshotCodecProvider(conf1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
provider1.encode(out, snapshot);
} finally {
out.close();
}
TransactionSnapshot snapshot2 = provider1.decode(new ByteArrayInputStream(out.toByteArray()));
TransactionVisibilityState minTxSnapshot = provider1.decodeTransactionVisibilityState(
new ByteArrayInputStream(out.toByteArray()));
assertTransactionVisibilityStateEquals(snapshot2, minTxSnapshot);
assertEquals(snapshot.getReadPointer(), snapshot2.getReadPointer());
assertEquals(snapshot.getWritePointer(), snapshot2.getWritePointer());
assertEquals(snapshot.getInvalid(), snapshot2.getInvalid());
// in-progress transactions will have missing types
assertNotEquals(snapshot.getInProgress(), snapshot2.getInProgress());
assertEquals(snapshot.getCommittingChangeSets(), snapshot2.getCommittingChangeSets());
assertEquals(snapshot.getCommittedChangeSets(), snapshot2.getCommittedChangeSets());
// after fixing in-progress, full snapshot should match
Map<Long, TransactionManager.InProgressTx> fixedInProgress = TransactionManager.txnBackwardsCompatCheck(
TxConstants.Manager.DEFAULT_TX_LONG_TIMEOUT, 10000L, snapshot2.getInProgress());
assertEquals(snapshot.getInProgress(), fixedInProgress);
assertEquals(snapshot, snapshot2);
}
/**
* Test full stack serialization for a TransactionManager migrating from DefaultSnapshotCodec to SnapshotCodecV3.
*/
@Test
public void testDefaultToV3Migration() throws Exception {
File testDir = tmpDir.newFolder("testDefaultToV3Migration");
Configuration conf = new Configuration();
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
Injector injector = Guice.createInjector(new ConfigModule(conf),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
TransactionManager txManager = injector.getInstance(TransactionManager.class);
txManager.startAndWait();
txManager.startLong();
// shutdown to force a snapshot
txManager.stopAndWait();
TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
txStorage.startAndWait();
// confirm that the in-progress entry is missing a type
TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
assertNotNull(snapshot);
assertEquals(1, snapshot.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> entry =
snapshot.getInProgress().entrySet().iterator().next();
assertNull(entry.getValue().getType());
txStorage.stopAndWait();
// start a new Tx manager to test fixup
Configuration conf2 = new Configuration();
conf2.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
conf2.setStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
DefaultSnapshotCodec.class.getName(), SnapshotCodecV3.class.getName());
Injector injector2 = Guice.createInjector(new ConfigModule(conf2),
new DiscoveryModules().getSingleNodeModules(), new TransactionModules().getSingleNodeModules());
TransactionManager txManager2 = injector2.getInstance(TransactionManager.class);
txManager2.startAndWait();
// state should be recovered
TransactionSnapshot snapshot2 = txManager2.getCurrentState();
assertEquals(1, snapshot2.getInProgress().size());
Map.Entry<Long, TransactionManager.InProgressTx> inProgressTx =
snapshot2.getInProgress().entrySet().iterator().next();
assertEquals(TransactionManager.InProgressType.LONG, inProgressTx.getValue().getType());
// save a new snapshot
txManager2.stopAndWait();
TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
txStorage2.startAndWait();
TransactionSnapshot snapshot3 = txStorage2.getLatestSnapshot();
// full snapshot should have deserialized correctly without any fixups
assertEquals(snapshot2.getInProgress(), snapshot3.getInProgress());
assertEquals(snapshot2, snapshot3);
txStorage2.stopAndWait();
}
@Test
public void testSnapshotCodecProviderConfiguration() throws Exception {
Configuration conf = new Configuration(false);
StringBuilder buf = new StringBuilder();
for (Class c : TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES) {
if (buf.length() > 0) {
buf.append(",\n ");
}
buf.append(c.getName());
}
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, buf.toString());
SnapshotCodecProvider codecProvider = new SnapshotCodecProvider(conf);
SnapshotCodec v1codec = codecProvider.getCodecForVersion(new DefaultSnapshotCodec().getVersion());
assertNotNull(v1codec);
assertTrue(v1codec instanceof DefaultSnapshotCodec);
SnapshotCodec v2codec = codecProvider.getCodecForVersion(new SnapshotCodecV2().getVersion());
assertNotNull(v2codec);
assertTrue(v2codec instanceof SnapshotCodecV2);
SnapshotCodec v3codec = codecProvider.getCodecForVersion(new SnapshotCodecV3().getVersion());
assertNotNull(v3codec);
assertTrue(v3codec instanceof SnapshotCodecV3);
SnapshotCodec v4codec = codecProvider.getCodecForVersion(new SnapshotCodecV4().getVersion());
assertNotNull(v4codec);
assertTrue(v4codec instanceof SnapshotCodecV4);
}
@Test
public void testSnapshotCodecV4() throws IOException, TransactionFailureException {
File testDir = tmpDir.newFolder("testSnapshotCodecV4");
Configuration conf = new Configuration();
conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, SnapshotCodecV4.class.getName());
conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR, testDir.getAbsolutePath());
Injector injector = Guice.createInjector(new ConfigModule(conf),
new DiscoveryModules().getSingleNodeModules(),
new TransactionModules().getSingleNodeModules());
TransactionManager txManager = injector.getInstance(TransactionManager.class);
txManager.startAndWait();
// Create a transaction and a checkpoint transaction
Transaction transaction = txManager.startLong();
Transaction checkpointTx = txManager.checkpoint(transaction);
// create invalid transactions (invalidated out of order)
Transaction shortTx1 = txManager.startShort();
Transaction shortTx2 = txManager.startShort();
Transaction shortTx3 = txManager.startShort();
txManager.invalidate(shortTx3.getTransactionId());
txManager.invalidate(shortTx1.getTransactionId());
txManager.invalidate(shortTx2.getTransactionId());
// shutdown to force a snapshot
txManager.stopAndWait();
// Validate the snapshot on disk
TransactionStateStorage txStorage = injector.getInstance(TransactionStateStorage.class);
txStorage.startAndWait();
TransactionSnapshot snapshot = txStorage.getLatestSnapshot();
Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
TransactionVisibilityState txVisibilityState = txStorage.getLatestTransactionVisibilityState();
assertTransactionVisibilityStateEquals(snapshot, txVisibilityState);
Map<Long, TransactionManager.InProgressTx> inProgress = snapshot.getInProgress();
Assert.assertEquals(2, inProgress.size());
TransactionManager.InProgressTx inProgressTx = inProgress.get(transaction.getTransactionId());
Assert.assertNotNull(inProgressTx);
Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
inProgressTx.getCheckpointWritePointers().toLongArray());
inProgressTx = inProgress.get(checkpointTx.getWritePointer());
Assert.assertNotNull(inProgressTx);
Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType());
Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
txStorage.stopAndWait();
// start a new Tx manager to see if the transaction is restored correctly.
Injector injector2 = Guice.createInjector(new ConfigModule(conf),
new DiscoveryModules().getSingleNodeModules(),
new TransactionModules().getSingleNodeModules());
txManager = injector2.getInstance(TransactionManager.class);
txManager.startAndWait();
// state should be recovered
snapshot = txManager.getCurrentState();
Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
inProgress = snapshot.getInProgress();
Assert.assertEquals(2, inProgress.size());
inProgressTx = inProgress.get(transaction.getTransactionId());
Assert.assertNotNull(inProgressTx);
Assert.assertArrayEquals(checkpointTx.getCheckpointWritePointers(),
inProgressTx.getCheckpointWritePointers().toLongArray());
inProgressTx = inProgress.get(checkpointTx.getWritePointer());
Assert.assertNotNull(inProgressTx);
Assert.assertEquals(TransactionManager.InProgressType.CHECKPOINT, inProgressTx.getType());
Assert.assertTrue(inProgressTx.getCheckpointWritePointers().isEmpty());
// Should be able to commit the transaction
Assert.assertTrue(txManager.canCommit(checkpointTx, Collections.<byte[]>emptyList()));
Assert.assertTrue(txManager.commit(checkpointTx));
// save a new snapshot
txManager.stopAndWait();
TransactionStateStorage txStorage2 = injector2.getInstance(TransactionStateStorage.class);
txStorage2.startAndWait();
snapshot = txStorage2.getLatestSnapshot();
Assert.assertTrue(Ordering.natural().isOrdered((snapshot.getInvalid())));
Assert.assertTrue(snapshot.getInProgress().isEmpty());
txStorage2.stopAndWait();
}
private void assertTransactionVisibilityStateEquals(TransactionVisibilityState expected,
TransactionVisibilityState input) {
Assert.assertEquals(expected.getTimestamp(), input.getTimestamp());
Assert.assertEquals(expected.getReadPointer(), input.getReadPointer());
Assert.assertEquals(expected.getWritePointer(), input.getWritePointer());
Assert.assertEquals(expected.getInProgress(), input.getInProgress());
Assert.assertEquals(expected.getInvalid(), input.getInvalid());
}
}