blob: 3814b82e4a9be965ac74dbd89ef15edc5ff9eb75 [file] [log] [blame]
package org.apache.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Date;
import java.util.List;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.api.listeners.ScopedConfigChangeListener;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestListenerCallback extends ZkUnitTestBase {
class TestScopedConfigListener implements ScopedConfigChangeListener {
boolean _configChanged = false;
int _configSize = 0;
@Override
public void onConfigChange(List<HelixProperty> configs, NotificationContext context) {
_configChanged = true;
_configSize = configs.size();
}
public void reset() {
_configChanged = false;
_configSize = 0;
}
}
class TestConfigListener implements InstanceConfigChangeListener, ClusterConfigChangeListener,
ResourceConfigChangeListener {
boolean _instanceConfigChanged = false;
boolean _resourceConfigChanged = false;
boolean _clusterConfigChanged = false;
List<ResourceConfig> _resourceConfigs;
List<InstanceConfig> _instanceConfigs;
ClusterConfig _clusterConfig;
@Override
public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs,
NotificationContext context) {
_instanceConfigChanged = true;
_instanceConfigs = instanceConfigs;
}
@Override
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
_clusterConfigChanged = true;
_clusterConfig = clusterConfig;
}
@Override
public void onResourceConfigChange(List<ResourceConfig> resourceConfigs,
NotificationContext context) {
_resourceConfigChanged = true;
_resourceConfigs = resourceConfigs;
}
public void reset() {
_instanceConfigChanged = false;
_resourceConfigChanged = false;
_clusterConfigChanged = false;
_resourceConfigs = null;
_instanceConfigs = null;
_clusterConfig = null;
}
}
int _numNodes = 3;
int _numResources = 4;
HelixManager _manager;
String _clusterName;
@BeforeClass
public void beforeClass() throws Exception {
_clusterName = TestHelper.getTestClassName();
System.out.println("START " + _clusterName + " at " + new Date(System.currentTimeMillis()));
TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
_numResources, // resources
32, // partitions per resource
_numNodes, // number of nodes
2, // replicas
"MasterSlave", true); // do rebalance
_manager = HelixManagerFactory.getZKHelixManager(_clusterName, "localhost",
InstanceType.SPECTATOR, ZK_ADDR);
_manager.connect();
}
@AfterClass
public void afterClass() throws Exception {
if (_manager != null && _manager.isConnected()) {
_manager.disconnect();
}
TestHelper.dropCluster(_clusterName, _gZkClient);
System.out.println("END " + _clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test
public void testConfigChangeListeners() throws Exception {
TestConfigListener listener = new TestConfigListener();
listener.reset();
_manager.addInstanceConfigChangeListener(listener);
boolean result = TestHelper.verify(()-> {
return listener._instanceConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get initial instanceConfig callback invoked");
Assert.assertEquals(listener._instanceConfigs.size(), _numNodes,
"Instance Config size does not match");
listener.reset();
_manager.addClusterfigChangeListener(listener);
result = TestHelper.verify(()-> {
return listener._clusterConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get initial clusterConfig callback invoked");
Assert.assertNotNull(listener._clusterConfig, "Cluster Config size should not be null");
listener.reset();
_manager.addResourceConfigChangeListener(listener);
result = TestHelper.verify(()-> {
return listener._resourceConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get initial resourceConfig callback invoked");
Assert.assertEquals(listener._resourceConfigs.size(), 0, "resource config size does not match");
// test change content
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
String instanceName = "localhost_12918";
HelixProperty value = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
listener.reset();
accessor.setProperty(keyBuilder.instanceConfig(instanceName), value);
result = TestHelper.verify(()-> {
return listener._instanceConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get instanceConfig callback invoked since we change instanceConfig");
Assert.assertEquals(listener._instanceConfigs.size(), _numNodes,
"Instance Config size does not match");
value = accessor.getProperty(keyBuilder.clusterConfig());
value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
listener.reset();
accessor.setProperty(keyBuilder.clusterConfig(), value);
result = TestHelper.verify(()-> {
return listener._clusterConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get clusterConfig callback invoked since we change clusterConfig");
Assert.assertNotNull(listener._clusterConfig, "Cluster Config size should not be null");
String resourceName = "TestDB_0";
value = new HelixProperty(resourceName);
value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
listener.reset();
accessor.setProperty(keyBuilder.resourceConfig(resourceName), value);
result = TestHelper.verify(()-> {
return listener._resourceConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get resourceConfig callback invoked since we add resourceConfig");
Assert.assertEquals(listener._resourceConfigs.size(), 1, "Resource config size does not match");
listener.reset();
accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
result = TestHelper.verify(()-> {
return listener._resourceConfigChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result, "Should get resourceConfig callback invoked since we add resourceConfig");
Assert.assertEquals(listener._resourceConfigs.size(), 0, "Instance Config size does not match");
}
@Test
public void testScopedConfigChangeListener() throws Exception {
TestScopedConfigListener listener = new TestScopedConfigListener();
listener.reset();
_manager.addConfigChangeListener(listener, ConfigScopeProperty.CLUSTER);
boolean result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,"Should get initial clusterConfig callback invoked");
Assert.assertEquals(listener._configSize, 1, "Cluster Config size should be 1");
listener.reset();
_manager.addConfigChangeListener(listener, ConfigScopeProperty.RESOURCE);
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,
"Should get initial resourceConfig callback invoked");
Assert.assertEquals(listener._configSize, 0, "Resource Config size does not match");
listener.reset();
_manager.addConfigChangeListener(listener, ConfigScopeProperty.PARTICIPANT);
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,"Should get initial resourceConfig callback invoked");
Assert.assertEquals(listener._configSize, _numNodes, "Instance Config size does not match");
// test change content
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
Builder keyBuilder = accessor.keyBuilder();
String instanceName = "localhost_12918";
HelixProperty value = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
listener.reset();
accessor.setProperty(keyBuilder.instanceConfig(instanceName), value);
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,
"Should get instanceConfig callback invoked since we change instanceConfig");
Assert.assertEquals(listener._configSize, _numNodes, "Instance Config size does not match");
value = accessor.getProperty(keyBuilder.clusterConfig());
value._record.setSimpleField("" + System.currentTimeMillis(), "newClusterValue");
listener.reset();
accessor.setProperty(keyBuilder.clusterConfig(), value);
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,
"Should get clusterConfig callback invoked since we change clusterConfig");
Assert.assertEquals(listener._configSize, 1, "Cluster Config size does not match");
String resourceName = "TestDB_0";
value = new HelixProperty(resourceName);
value._record.setSimpleField("" + System.currentTimeMillis(), "newValue");
listener.reset();
accessor.setProperty(keyBuilder.resourceConfig(resourceName), value);
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,
"Should get resourceConfig callback invoked since we add resourceConfig");
Assert.assertEquals(listener._configSize, 1, "Resource Config size does not match");
listener.reset();
accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
result = TestHelper.verify(()-> {
return listener._configChanged;
}, TestHelper.WAIT_DURATION);
Assert.assertTrue(result,
"Should get resourceConfig callback invoked since we add resourceConfig");
Assert.assertEquals(listener._configSize, 0, "Resource Config size does not match");
}
}