blob: 0193738f5dba92cd413f9951ec67bd0da6960aca [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Arrays;
import java.util.Date;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.TestHelper;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestZNRecordSizeLimit extends ZkUnitTestBase {
private static Logger LOG = LoggerFactory.getLogger(TestZNRecordSizeLimit.class);
private static final String ASSERTION_MESSAGE =
"Should succeed because compressed data is smaller than 1M. Caused by: ";
@Test
public void testZNRecordSizeLimitUseZNRecordSerializer() {
String className = getShortClassName();
System.out.println("START testZNRecordSizeLimitUseZNRecordSerializer at " + new Date(
System.currentTimeMillis()));
ZNRecordSerializer serializer = new ZNRecordSerializer();
String root = className;
byte[] buf = new byte[1024];
for (int i = 0; i < 1024; i++) {
buf[i] = 'a';
}
String bufStr = new String(buf);
// test zkClient
// legal-sized data gets written to zk
// write a znode of size less than 1m
final ZNRecord smallRecord = new ZNRecord("normalsize");
smallRecord.getSimpleFields().clear();
for (int i = 0; i < 900; i++) {
smallRecord.setSimpleField(i + "", bufStr);
}
String path1 = "/" + root + "/test1";
_gZkClient.createPersistent(path1, true);
_gZkClient.writeData(path1, smallRecord);
ZNRecord record = _gZkClient.readData(path1);
Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
// oversized data doesn't create any data on zk
// prepare a znode of size larger than 1m
final ZNRecord largeRecord = new ZNRecord("oversize");
largeRecord.getSimpleFields().clear();
for (int i = 0; i < 1024; i++) {
largeRecord.setSimpleField(i + "", bufStr);
}
String path2 = "/" + root + "/test2";
_gZkClient.createPersistent(path2, true);
try {
_gZkClient.writeData(path2, largeRecord);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
record = _gZkClient.readData(path2);
Assert.assertNotNull(record);
// oversized write doesn't overwrite existing data on zk
record = _gZkClient.readData(path1);
try {
_gZkClient.writeData(path1, largeRecord);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
ZNRecord recordNew = _gZkClient.readData(path1);
try {
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
Assert.assertFalse(Arrays.equals(arr, arrNew));
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(className, true);
InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
admin.addInstance(className, instanceConfig);
// oversized data should not create any new data on zk
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(_gZkClient));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = new IdealState("currentState");
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 1024; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
Assert.assertTrue(succeed);
HelixProperty property = accessor.getProperty(
keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
Assert.assertNull(property);
// legal sized data gets written to zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 900; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertTrue(succeed);
record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
try {
Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
// oversized data should not update existing data on zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 900; i < 1024; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
// System.out.println("record: " + idealState.getRecord());
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertTrue(succeed);
recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
try {
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
Assert.assertFalse(Arrays.equals(arr, arrNew));
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
System.out.println("END testZNRecordSizeLimitUseZNRecordSerializer at " + new Date(
System.currentTimeMillis()));
}
@Test(dependsOnMethods = "testZNRecordSizeLimitUseZNRecordSerializer")
public void testZNRecordSizeLimitUseZNRecordStreamingSerializer() {
String className = getShortClassName();
System.out.println("START testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
System.currentTimeMillis()));
ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
try {
zkClient.setZkSerializer(serializer);
String root = className;
byte[] buf = new byte[1024];
for (int i = 0; i < 1024; i++) {
buf[i] = 'a';
}
String bufStr = new String(buf);
// test zkClient
// legal-sized data gets written to zk
// write a znode of size less than 1m
final ZNRecord smallRecord = new ZNRecord("normalsize");
smallRecord.getSimpleFields().clear();
for (int i = 0; i < 900; i++) {
smallRecord.setSimpleField(i + "", bufStr);
}
String path1 = "/" + root + "/test1";
zkClient.createPersistent(path1, true);
zkClient.writeData(path1, smallRecord);
ZNRecord record = zkClient.readData(path1);
try {
Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
// oversized data doesn't create any data on zk
// prepare a znode of size larger than 1m
final ZNRecord largeRecord = new ZNRecord("oversize");
largeRecord.getSimpleFields().clear();
for (int i = 0; i < 1024; i++) {
largeRecord.setSimpleField(i + "", bufStr);
}
String path2 = "/" + root + "/test2";
zkClient.createPersistent(path2, true);
try {
zkClient.writeData(path2, largeRecord);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
record = zkClient.readData(path2);
Assert.assertNotNull(record);
// oversized write doesn't overwrite existing data on zk
record = zkClient.readData(path1);
try {
zkClient.writeData(path1, largeRecord);
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
ZNRecord recordNew = zkClient.readData(path1);
try {
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
Assert.assertFalse(Arrays.equals(arr, arrNew));
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
admin.addCluster(className, true);
InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
admin.addInstance(className, instanceConfig);
// oversized data should not create any new data on zk
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(className, new ZkBaseDataAccessor(zkClient));
Builder keyBuilder = accessor.keyBuilder();
// ZNRecord statusUpdates = new ZNRecord("statusUpdates");
IdealState idealState = new IdealState("currentState");
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 1024; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_1"), idealState);
Assert.assertTrue(succeed);
HelixProperty property = accessor.getProperty(keyBuilder.idealStates("TestDB_1"));
Assert.assertNotNull(property);
// legal sized data gets written to zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 900; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
succeed = accessor.setProperty(keyBuilder.idealStates("TestDB_2"), idealState);
Assert.assertTrue(succeed);
record = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
Assert.assertTrue(serializer.serialize(record).length > 900 * 1024);
// oversized data should not update existing data on zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 900; i < 1024; i++) {
idealState.getRecord().setSimpleField(i + "", bufStr);
}
// System.out.println("record: " + idealState.getRecord());
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB_2"), idealState);
Assert.assertTrue(succeed);
recordNew = accessor.getProperty(keyBuilder.idealStates("TestDB_2")).getRecord();
try {
byte[] arr = serializer.serialize(record);
byte[] arrNew = serializer.serialize(recordNew);
Assert.assertFalse(Arrays.equals(arr, arrNew));
} catch (ZkMarshallingError e) {
Assert.fail(ASSERTION_MESSAGE + e);
}
} finally {
zkClient.close();
}
System.out.println("END testZNRecordSizeLimitUseZNRecordStreamingSerializer at " + new Date(
System.currentTimeMillis()));
}
/*
* Tests ZNRecordSerializer threshold.
* Two cases using ZkClient and ZkDataAccessor:
* 1. serialized data size is less than threshold and could be written to ZK.
* 2. serialized data size is greater than threshold, so ZkClientException is thrown.
*/
@Test(dependsOnMethods = "testZNRecordSizeLimitUseZNRecordStreamingSerializer")
public void testZNRecordSerializerWriteSizeLimit() throws Exception {
// Backup properties for later resetting.
final String thresholdProperty =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
try {
ZNRecordSerializer serializer = new ZNRecordSerializer();
String root = getShortClassName();
byte[] buf = new byte[1024];
for (int i = 0; i < 1024; i++) {
buf[i] = 'a';
}
String bufStr = new String(buf);
// 1. legal-sized data gets written to zk
// write a znode of size less than writeSizeLimit
int rawZnRecordSize = 700;
int writeSizeLimitKb = 800;
int writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
for (int i = 0; i < rawZnRecordSize; i++) {
normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
}
String path = "/" + root + "/normal";
_gZkClient.createPersistent(path, true);
_gZkClient.writeData(path, normalSizeRecord);
ZNRecord record = _gZkClient.readData(path);
// Successfully reads the same data.
Assert.assertEquals(normalSizeRecord, record);
int length = serializer.serialize(record).length;
// Less than writeSizeLimit so it is written to ZK.
Assert.assertTrue(length < writeSizeLimit);
// 2. Large size data is not allowed to write to ZK
// Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
rawZnRecordSize = 2000;
// Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
writeSizeLimitKb = 1;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
final ZNRecord largeRecord = new ZNRecord("large-size");
for (int i = 0; i < rawZnRecordSize; i++) {
largeRecord.setSimpleField(Integer.toString(i), bufStr);
}
path = "/" + root + "/large";
_gZkClient.createPersistent(path, true);
try {
_gZkClient.writeData(path, largeRecord);
Assert.fail("Data should not be written to ZK because data size exceeds writeSizeLimit!");
} catch (ZkMarshallingError expected) {
Assert.assertTrue(
expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
}
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
admin.addCluster(root, true);
InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
admin.addInstance(root, instanceConfig);
// Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
writeSizeLimitKb = 10;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// oversized data should not create any new data on zk
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = new IdealState("currentState");
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 1024; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
Assert.assertTrue(succeed);
HelixProperty property = accessor.getProperty(
keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
Assert.assertNull(property);
// legal sized data gets written to zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 900; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertTrue(succeed);
record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
// Set small write size limit so writing does not succeed.
writeSizeLimitKb = 1;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// oversized data should not update existing data on zk
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 900; i < 1024; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertFalse(succeed,
"Update property should not succeed because data exceeds znode write limit!");
// Delete the nodes.
deletePath(_gZkClient, "/" + root);
} finally {
// Reset: add the properties back to system properties if they were originally available.
if (thresholdProperty != null) {
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
thresholdProperty);
} else {
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
}
}
}
/*
* Tests ZNRecordStreamingSerializer threshold.
* Two cases using ZkClient and ZkDataAccessor:
* 1. serialized data size is less than threshold and could be written to ZK.
* 2. serialized data size is greater than threshold, so ZkClientException is thrown.
*/
@Test(dependsOnMethods = "testZNRecordSerializerWriteSizeLimit")
public void testZNRecordStreamingSerializerWriteSizeLimit() throws Exception {
// Backup properties for later resetting.
final String thresholdProperty =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
ZNRecordStreamingSerializer serializer = new ZNRecordStreamingSerializer();
HelixZkClient zkClient = SharedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
try {
zkClient.setZkSerializer(serializer);
String root = getShortClassName();
byte[] buf = new byte[1024];
for (int i = 0; i < 1024; i++) {
buf[i] = 'a';
}
String bufStr = new String(buf);
// 1. legal-sized data gets written to zk
// write a znode of size less than writeSizeLimit
int rawZnRecordSize = 700;
int writeSizeLimitKb = 800;
int writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
final ZNRecord normalSizeRecord = new ZNRecord("normal-size");
for (int i = 0; i < rawZnRecordSize; i++) {
normalSizeRecord.setSimpleField(Integer.toString(i), bufStr);
}
String path = "/" + root + "/normal";
zkClient.createPersistent(path, true);
zkClient.writeData(path, normalSizeRecord);
ZNRecord record = zkClient.readData(path);
// Successfully reads the same data.
Assert.assertEquals(normalSizeRecord, record);
int length = serializer.serialize(record).length;
// Less than writeSizeLimit so it is written to ZK.
Assert.assertTrue(length < writeSizeLimit);
// 2. Large size data is not allowed to write to ZK
// Set raw record size to be large enough so its serialized data exceeds the writeSizeLimit.
rawZnRecordSize = 2000;
// Set the writeSizeLimit to very small so serialized data size exceeds the writeSizeLimit.
writeSizeLimitKb = 1;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
final ZNRecord largeRecord = new ZNRecord("large-size");
for (int i = 0; i < rawZnRecordSize; i++) {
largeRecord.setSimpleField(Integer.toString(i), bufStr);
}
path = "/" + root + "/large";
zkClient.createPersistent(path, true);
try {
zkClient.writeData(path, largeRecord);
Assert.fail("Data should not written to ZK because data size exceeds writeSizeLimit!");
} catch (ZkMarshallingError expected) {
Assert.assertTrue(
expected.getMessage().contains(" is greater than " + writeSizeLimit + " bytes"));
}
// test ZkDataAccessor
ZKHelixAdmin admin = new ZKHelixAdmin(ZK_ADDR);
admin.addCluster(root, true);
InstanceConfig instanceConfig = new InstanceConfig("localhost_12918");
admin.addInstance(root, instanceConfig);
// Set the writeSizeLimit to 10KB so serialized data size does not exceed writeSizeLimit.
writeSizeLimitKb = 10;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// oversize data should not create any new data on zk
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(root, new ZkBaseDataAccessor<>(ZK_ADDR));
Builder keyBuilder = accessor.keyBuilder();
IdealState idealState = new IdealState("currentState");
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 1024; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
boolean succeed = accessor.setProperty(keyBuilder.idealStates("TestDB0"), idealState);
Assert.assertTrue(succeed);
HelixProperty property = accessor.getProperty(
keyBuilder.stateTransitionStatus("localhost_12918", "session_1", "partition_1"));
Assert.assertNull(property);
// legal sized data gets written to zk
idealState.getRecord().getSimpleFields().clear();
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 0; i < 900; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
succeed = accessor.setProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertTrue(succeed);
record = accessor.getProperty(keyBuilder.idealStates("TestDB1")).getRecord();
Assert.assertTrue(serializer.serialize(record).length < writeSizeLimit);
// Set small write size limit so writing does not succeed.
writeSizeLimitKb = 1;
writeSizeLimit = writeSizeLimitKb * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// oversize data should not update existing data on zk
idealState.setStateModelDefRef("MasterSlave");
idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
idealState.setNumPartitions(10);
for (int i = 900; i < 1024; i++) {
idealState.getRecord().setSimpleField(Integer.toString(i), bufStr);
}
succeed = accessor.updateProperty(keyBuilder.idealStates("TestDB1"), idealState);
Assert.assertFalse(succeed,
"Update property should not succeed because data exceeds znode write limit!");
// Delete the nodes.
deletePath(zkClient, "/" + root);
} finally {
zkClient.close();
// Reset: add the properties back to system properties if they were originally available.
if (thresholdProperty != null) {
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
thresholdProperty);
} else {
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
}
}
}
private void deletePath(final HelixZkClient zkClient, final String path) throws Exception {
Assert.assertTrue(TestHelper.verify(() -> {
do {
try {
zkClient.deleteRecursively(path);
} catch (ZkClientException ex) {
// ignore
}
} while (zkClient.exists(path));
return true;
}, TestHelper.WAIT_DURATION));
}
}