blob: fc9fa77199ddafa7a17318aafc3db4d574e670f8 [file] [log] [blame]
package org.apache.helix.zookeeper.datamodel.serializer;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.introspect.CodehausJacksonIntrospector;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestZNRecordSerializeWriteSizeLimit {
/*
* Tests data serializing when auto compression is disabled. If the system property for
* auto compression is set to "false", auto compression is disabled.
*/
@Test
public void testAutoCompressionDisabled() {
// Backup properties for later resetting.
final String compressionEnabledProperty =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
final String compressionThresholdProperty =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
try {
// Prepare system properties to disable auto compression.
final int writeSizeLimit = 200 * 1024;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// 2. Set the auto compression enabled property to false so auto compression is disabled.
System
.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED, "false");
// Verify auto compression is disabled.
Assert.assertFalse(
Boolean.getBoolean(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED));
// Data size 300 KB > size limit 200 KB: exception expected.
verifyAutoCompression(300, writeSizeLimit, true, false, true);
// Data size 100 KB < size limit 200 KB: pass
verifyAutoCompression(100, writeSizeLimit, false, false, false);
} finally {
// Reset: add the properties back to system properties if they were originally available.
if (compressionEnabledProperty != null) {
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED,
compressionEnabledProperty);
} else {
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED);
}
if (compressionThresholdProperty != null) {
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
compressionThresholdProperty);
} else {
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
}
}
}
/*
* Tests data serializing when write size limit is set.
* Two cases:
* 1. limit is not set
* --> default size is used.
* 2. limit is set
* --> serialized data is checked by the limit: pass or throw exception.
*/
@Test
public void testZNRecordSerializerWriteSizeLimit() {
// Backup properties for later resetting.
final String writeSizeLimitProperty =
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
try {
// Unset write size limit property so default limit is used.
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
Assert.assertNull(
System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES));
verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false);
// 2. Set size limit so serialized data is greater than the size limit but compressed data
// is smaller than the size limit.
// Set it to 2000 bytes
int writeSizeLimit = 2000;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// Verify auto compression is done.
verifyAutoCompression(200, writeSizeLimit, true, true, false);
// 3. Set size limit to be be less than default value. The default value will be used for write
// size limit.
writeSizeLimit = 2000;
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
String.valueOf(writeSizeLimit));
// Verify exception is thrown because compressed data is larger than size limit.
verifyAutoCompression(1000, writeSizeLimit, true, true, true);
} finally {
// Reset: add the properties back to system properties if they were originally available.
if (writeSizeLimitProperty != null) {
System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES,
writeSizeLimitProperty);
} else {
System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES);
}
}
}
private void verifyAutoCompression(int recordSize, int limit, boolean greaterThanThreshold,
boolean compressionExpected, boolean exceptionExpected) {
ZNRecord record = createZNRecord(recordSize);
// Makes sure the length of serialized bytes is greater than limit to
// satisfy the condition: serialized bytes' length exceeds the limit.
byte[] preCompressedBytes = serialize(record);
Assert.assertEquals(preCompressedBytes.length > limit, greaterThanThreshold);
ZkSerializer zkSerializer = new ZNRecordSerializer();
byte[] bytes;
try {
bytes = zkSerializer.serialize(record);
Assert.assertEquals(bytes.length >= limit, exceptionExpected);
Assert.assertFalse(exceptionExpected);
} catch (ZkMarshallingError e) {
Assert.assertTrue(exceptionExpected, "Should not throw exception.");
Assert.assertTrue(e.getMessage().contains(" is greater than " + limit + " bytes"));
// No need to verify following asserts as bytes data is not returned.
return;
}
// Verify whether serialized data is compressed or not.
Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes), compressionExpected);
Assert.assertEquals(preCompressedBytes.length != bytes.length, compressionExpected);
// Verify serialized bytes could correctly deserialize.
Assert.assertEquals(zkSerializer.deserialize(bytes), record);
}
private ZNRecord createZNRecord(final int recordSizeKb) {
byte[] buf = new byte[1024];
for (int i = 0; i < 1024; i++) {
buf[i] = 'a';
}
String bufStr = new String(buf);
ZNRecord record = new ZNRecord("record");
for (int i = 0; i < recordSizeKb; i++) {
record.setSimpleField(Integer.toString(i), bufStr);
}
return record;
}
// Simulates serializing so we can check the size of serialized bytes.
// Returns raw serialized bytes before being compressed.
private byte[] serialize(Object data) {
ObjectMapper mapper = new ObjectMapper();
mapper.setAnnotationIntrospector(new CodehausJacksonIntrospector());
mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enable(MapperFeature.AUTO_DETECT_FIELDS);
mapper.enable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] serializedBytes = new byte[0];
try {
mapper.writeValue(baos, data);
serializedBytes = baos.toByteArray();
} catch (IOException e) {
Assert.fail("Can not serialize data.", e);
}
return serializedBytes;
}
}