blob: 77acfb16af0492549e768cdc1cf0fa9f8d39c070 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
public class TestLeaseManager {
private static final long DUMMY_EPOCH_1 = 1L;
private static final long DUMMY_EPOCH_2 = 2L;
private static final long DUMMY_EPOCH_3 = 3L;
private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
private static final String LEASE_MGR_ID_1 = "LM1";
private static final String LEASE_MGR_ID_2 = "LM2";
private static final String INSTANCE_ID_1 = "LM1" + "#";
private static final String INSTANCE_ID_2 = "LM2" + "#";
private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
private static final long TEST_LEASE_PERIOD_IN_MS = 5000; // 5 seconds
private CuratorFramework zkClient;
private TestingServer zkServer;
@Mock
private Panicker panicker;
private PausableLeaseManager leaseManager1;
private PausableLeaseManager leaseManager2;
@BeforeClass
public void beforeClass() throws Exception {
LOG.info("Starting ZK Server");
zkServer = TestUtils.provideTestingZKServer();
LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
}
@AfterClass
public void afterClass() throws Exception {
zkClient.close();
CloseableUtils.closeQuietly(zkServer);
zkServer = null;
LOG.info("ZK Server Stopped");
}
@Test(timeOut = 80_000)
public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
Panicker panicker = spy(new MockPanicker());
TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
TSOStateManager stateManager = mock(TSOStateManager.class);
when(stateManager.initialize()).thenThrow(new IOException());
leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
tsoChannelHandler,
stateManager,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager1.startService();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
leaseManager1.stopService();
}
@Test(timeOut = 80_000)
public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance() throws Exception {
final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
// Launch the instance under test...
TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
TSOStateManager stateManager1 = mock(TSOStateManager.class);
when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
tsoChannelHandler1,
stateManager1,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager1.startService();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... check is the lease holder
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
assertTrue(leaseManager1.stillInLeasePeriod());
// Then, pause instance when trying to renew lease...
leaseManager1.pausedInTryToRenewLeasePeriod();
// ...let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ...check that nothing changed...
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
// Finally, resume the instance...
leaseManager1.resume();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... and check again that nothing changed
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
assertFalse(leaseManager1.stillInLeasePeriod()); // Must not be master as it should have triggered and exception
}
@Test(timeOut = 80_000)
public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
// Launch the master instance...
TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
TSOStateManager stateManager1 = mock(TSOStateManager.class);
when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
tsoChannelHandler1,
stateManager1,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager1.startService();
// ...let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ...so it should be the current holder of the lease
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
assertTrue(leaseManager1.stillInLeasePeriod());
// Then launch another instance...
TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
TSOStateManager stateManager2 = mock(TSOStateManager.class);
when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
tsoChannelHandler2,
stateManager2,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager2.startService();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... and after the period, the first instance should be still the holder
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
assertTrue(leaseManager1.stillInLeasePeriod());
assertFalse(leaseManager2.stillInLeasePeriod());
}
@Test(timeOut = 80_000)
public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
// Launch the master instance...
TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
TSOStateManager stateManager1 = mock(TSOStateManager.class);
when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
tsoChannelHandler1,
stateManager1,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager1.startService();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... so it should be the current holder of the lease
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
assertTrue(leaseManager1.stillInLeasePeriod());
// Then launch another instance...
TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
TSOStateManager stateManager2 = mock(TSOStateManager.class);
when(stateManager2.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
tsoChannelHandler2,
stateManager2,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager2.startService();
// ... and pause active lease manager...
leaseManager1.pausedInStillInLeasePeriod();
// ... and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... and check that lease owner should have changed to the second instance
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
assertTrue(leaseManager2.stillInLeasePeriod());
// Now, lets resume the first instance...
when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
leaseManager1.resume();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// and check the lease owner is still the second instance (preserves the lease)
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
assertFalse(leaseManager1.stillInLeasePeriod());
assertTrue(leaseManager2.stillInLeasePeriod());
// Finally, pause active lease manager when trying to renew lease...
leaseManager2.pausedInTryToRenewLeasePeriod();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... and check lease owner is has changed again to the first instance
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
assertFalse(leaseManager2.stillInLeasePeriod());
assertTrue(leaseManager1.stillInLeasePeriod());
// Resume the second instance...
leaseManager2.resume();
// ... let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
// ... but the lease owner should still be the first instance
checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
assertFalse(leaseManager2.stillInLeasePeriod());
assertTrue(leaseManager1.stillInLeasePeriod());
}
@Test(timeOut = 80_000)
public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
Panicker panicker = spy(new MockPanicker());
// Launch the master instance...
TSOStateManager stateManager1 = mock(TSOStateManager.class);
when(stateManager1.initialize()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
mock(TSOChannelHandler.class),
stateManager1,
TEST_LEASE_PERIOD_IN_MS,
TEST_TSO_LEASE_PATH,
TEST_CURRENT_TSO_PATH,
zkClient,
panicker);
leaseManager.startService();
// ...and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
leaseManager.pausedInTryToRenewLeasePeriod();
// 1st Panic test) Inject corrupted data in the ZNode, force reelection and test the panicker is exercised
zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
// ...and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
leaseManager.resume();
// ...and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
verify(panicker, times(2)).panic(anyString(), trowableIAE.capture());
assertTrue(trowableIAE.getValue() != null);
assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
// 2nd Panic test) Simulate that a new master appeared in the meantime, force reelection
// and test the panicker is exercised
reset(panicker);
zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
leaseManager.pausedInTryToRenewLeasePeriod();
// ...and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
leaseManager.resume();
// ...and let the test run for some time...
Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
verify(panicker, times(2)).panic(anyString(), trowableLME.capture());
assertTrue(trowableLME.getValue() != null);
assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
}
@Test(timeOut = 1000)
public void testNonHALeaseManager() throws Exception {
// Launch the instance...
VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
mock(TSOStateManager.class));
leaseManager.startService();
assertTrue(leaseManager.stillInLeasePeriod());
leaseManager.stopService();
}
// ----------------------------------------------------------------------------------------------------------------
// Checkers
// ----------------------------------------------------------------------------------------------------------------
private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
assertEquals(leaseHolder, expectedLeaseHolder);
}
private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
assertEquals(instanceId, expectedInstanceId);
}
}