blob: dba8c6b44f1733f28ca499320e93c6406b576afc [file] [log] [blame]
package org.apache.helix.spectator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Map;
import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyType;
import org.apache.helix.TestHelper;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.mock.MockZkHelixDataAccessor;
import org.apache.helix.model.CurrentState;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestRoutingDataCache extends ZkStandAloneCMTestBase {
@Test
public void testUpdateOnNotification() {
Assert.assertTrue(_clusterVerifier.verifyByPolling());
MockZkHelixDataAccessor accessor =
new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
RoutingDataCache cache =
new RoutingDataCache("CLUSTER_" + TestHelper.getTestClassName(), PropertyType.EXTERNALVIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
accessor.clearReadCounters();
// refresh again should read nothing
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
accessor.clearReadCounters();
// refresh again should read nothing as ideal state is same
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
}
@Test(dependsOnMethods = { "testUpdateOnNotification" })
public void testSelectiveUpdates()
throws Exception {
// Added verifier to make sure the test starts at a stable state. Note, if
// testCurrentStatesSelectiveUpdate() run first. This test may fail without
// this line. The reason is that when testCurrentStatesSelectiveUpdate()
// stop one participant, this would trigger liveInstance update in controller
// which would lead to new external view for TestDB get updated. The update is
// async to the construction of RoutingDataCache in this test and subsequent
// refresh().
Assert.assertTrue(_clusterVerifier.verifyByPolling());
MockZkHelixDataAccessor accessor =
new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
RoutingDataCache cache =
new RoutingDataCache("CLUSTER_" + TestHelper.getTestClassName(), PropertyType.EXTERNALVIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
accessor.clearReadCounters();
// refresh again should read nothing
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
// refresh again should read nothing
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 0);
// add new resources
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_1", 1, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_1", _replica);
Thread.sleep(100);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
accessor.clearReadCounters();
// refresh again should read only new current states and new idealstate
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
// Add more resources
accessor.clearReadCounters();
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_2", 1, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_2", _replica);
_gSetupTool.addResourceToCluster(CLUSTER_NAME, "TestDB_3", 1, STATE_MODEL);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, "TestDB_3", _replica);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
// Totally four resources. Two of them are newly added.
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 2);
// update one resource
accessor.clearReadCounters();
_gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, "TestDB_2", false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
cache.notifyDataChange(HelixConstants.ChangeType.EXTERNAL_VIEW);
cache.refresh(accessor);
Assert.assertEquals(accessor.getReadCount(PropertyType.EXTERNALVIEW), 1);
}
@Test
public void testCurrentStatesSelectiveUpdate() {
// Add a live instance to the cluster so the original cluster is not affected
// by stopping a participant.
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR);
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
MockParticipantManager participant =
new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
participant.syncStart();
Assert.assertTrue(_clusterVerifier.verifyByPolling());
try {
MockZkHelixDataAccessor accessor =
new MockZkHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient));
RoutingDataCache cache = new RoutingDataCache(CLUSTER_NAME, PropertyType.CURRENTSTATES);
// Empty current states map before refreshing.
Assert.assertTrue(cache.getCurrentStatesMap().isEmpty());
// 1. Initial cache refresh.
cache.refresh(accessor);
Map<String, Map<String, Map<String, CurrentState>>> currentStatesV1 =
cache.getCurrentStatesMap();
// Current states map is not empty and size equals to number of live instances.
Assert.assertFalse(currentStatesV1.isEmpty());
Assert.assertEquals(currentStatesV1.size(), _participants.length + 1);
// 2. Without any change, refresh routing data cache.
cache.refresh(accessor);
// Because of no current states change, current states cache doesn't refresh.
Assert.assertEquals(cache.getCurrentStatesMap(), currentStatesV1);
// 3. Stop one participant to make live instance change and refresh routing data cache.
participant.syncStop();
cache.notifyDataChange(HelixConstants.ChangeType.LIVE_INSTANCE);
cache.refresh(accessor);
Map<String, Map<String, Map<String, CurrentState>>> currentStatesV2 =
cache.getCurrentStatesMap();
// Current states cache should refresh and change.
Assert.assertFalse(currentStatesV2.isEmpty());
Assert.assertEquals(currentStatesV2.size(), _participants.length);
Assert.assertFalse(currentStatesV1.equals(currentStatesV2));
cache.refresh(accessor);
// No change.
Assert.assertEquals(cache.getCurrentStatesMap(), currentStatesV2);
} finally {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceName, false);
_gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, instanceName);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, _replica);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
}
}
}