| package org.apache.helix.zookeeper.datamodel.serializer; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| |
| import com.fasterxml.jackson.databind.MapperFeature; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.fasterxml.jackson.databind.SerializationFeature; |
| import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.introspect.CodehausJacksonIntrospector; |
| import org.apache.helix.zookeeper.util.GZipCompressionUtil; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError; |
| import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| |
| public class TestZNRecordSerializeWriteSizeLimit { |
| |
| /* |
| * Tests data serializing when auto compression is disabled. If the system property for |
| * auto compression is set to "false", auto compression is disabled. |
| */ |
| @Test |
| public void testAutoCompressionDisabled() { |
| // Backup properties for later resetting. |
| final String compressionEnabledProperty = |
| System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED); |
| final String compressionThresholdProperty = |
| System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); |
| |
| try { |
| // Prepare system properties to disable auto compression. |
| final int writeSizeLimit = 200 * 1024; |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, |
| String.valueOf(writeSizeLimit)); |
| |
| // 2. Set the auto compression enabled property to false so auto compression is disabled. |
| System |
| .setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED, "false"); |
| |
| // Verify auto compression is disabled. |
| Assert.assertFalse( |
| Boolean.getBoolean(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED)); |
| // Data size 300 KB > size limit 200 KB: exception expected. |
| verifyAutoCompression(300, writeSizeLimit, true, false, true); |
| |
| // Data size 100 KB < size limit 200 KB: pass |
| verifyAutoCompression(100, writeSizeLimit, false, false, false); |
| } finally { |
| // Reset: add the properties back to system properties if they were originally available. |
| if (compressionEnabledProperty != null) { |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED, |
| compressionEnabledProperty); |
| } else { |
| System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_AUTO_COMPRESS_ENABLED); |
| } |
| if (compressionThresholdProperty != null) { |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, |
| compressionThresholdProperty); |
| } else { |
| System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); |
| } |
| } |
| } |
| |
| /* |
| * Tests data serializing when write size limit is set. |
| * Two cases: |
| * 1. limit is not set |
| * --> default size is used. |
| * 2. limit is set |
| * --> serialized data is checked by the limit: pass or throw exception. |
| */ |
| @Test |
| public void testZNRecordSerializerWriteSizeLimit() { |
| // Backup properties for later resetting. |
| final String writeSizeLimitProperty = |
| System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); |
| |
| try { |
| // Unset write size limit property so default limit is used. |
| System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); |
| |
| Assert.assertNull( |
| System.getProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES)); |
| |
| verifyAutoCompression(500, ZNRecord.SIZE_LIMIT, false, false, false); |
| |
| // 2. Set size limit so serialized data is greater than the size limit but compressed data |
| // is smaller than the size limit. |
| // Set it to 2000 bytes |
| int writeSizeLimit = 2000; |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, |
| String.valueOf(writeSizeLimit)); |
| |
| // Verify auto compression is done. |
| verifyAutoCompression(200, writeSizeLimit, true, true, false); |
| |
| // 3. Set size limit to be be less than default value. The default value will be used for write |
| // size limit. |
| writeSizeLimit = 2000; |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, |
| String.valueOf(writeSizeLimit)); |
| |
| // Verify exception is thrown because compressed data is larger than size limit. |
| verifyAutoCompression(1000, writeSizeLimit, true, true, true); |
| } finally { |
| // Reset: add the properties back to system properties if they were originally available. |
| if (writeSizeLimitProperty != null) { |
| System.setProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES, |
| writeSizeLimitProperty); |
| } else { |
| System.clearProperty(ZkSystemPropertyKeys.ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES); |
| } |
| } |
| } |
| |
| private void verifyAutoCompression(int recordSize, int limit, boolean greaterThanThreshold, |
| boolean compressionExpected, boolean exceptionExpected) { |
| ZNRecord record = createZNRecord(recordSize); |
| |
| // Makes sure the length of serialized bytes is greater than limit to |
| // satisfy the condition: serialized bytes' length exceeds the limit. |
| byte[] preCompressedBytes = serialize(record); |
| |
| Assert.assertEquals(preCompressedBytes.length > limit, greaterThanThreshold); |
| |
| ZkSerializer zkSerializer = new ZNRecordSerializer(); |
| |
| byte[] bytes; |
| try { |
| bytes = zkSerializer.serialize(record); |
| |
| Assert.assertEquals(bytes.length >= limit, exceptionExpected); |
| Assert.assertFalse(exceptionExpected); |
| } catch (ZkMarshallingError e) { |
| Assert.assertTrue(exceptionExpected, "Should not throw exception."); |
| Assert.assertTrue(e.getMessage().contains(" is greater than " + limit + " bytes")); |
| // No need to verify following asserts as bytes data is not returned. |
| return; |
| } |
| |
| // Verify whether serialized data is compressed or not. |
| Assert.assertEquals(GZipCompressionUtil.isCompressed(bytes), compressionExpected); |
| Assert.assertEquals(preCompressedBytes.length != bytes.length, compressionExpected); |
| |
| // Verify serialized bytes could correctly deserialize. |
| Assert.assertEquals(zkSerializer.deserialize(bytes), record); |
| } |
| |
| private ZNRecord createZNRecord(final int recordSizeKb) { |
| byte[] buf = new byte[1024]; |
| for (int i = 0; i < 1024; i++) { |
| buf[i] = 'a'; |
| } |
| String bufStr = new String(buf); |
| |
| ZNRecord record = new ZNRecord("record"); |
| for (int i = 0; i < recordSizeKb; i++) { |
| record.setSimpleField(Integer.toString(i), bufStr); |
| } |
| |
| return record; |
| } |
| |
| // Simulates serializing so we can check the size of serialized bytes. |
| // Returns raw serialized bytes before being compressed. |
| private byte[] serialize(Object data) { |
| ObjectMapper mapper = new ObjectMapper(); |
| mapper.setAnnotationIntrospector(new CodehausJacksonIntrospector()); |
| mapper.enable(SerializationFeature.INDENT_OUTPUT); |
| mapper.enable(MapperFeature.AUTO_DETECT_FIELDS); |
| mapper.enable(MapperFeature.CAN_OVERRIDE_ACCESS_MODIFIERS); |
| |
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| byte[] serializedBytes = new byte[0]; |
| |
| try { |
| mapper.writeValue(baos, data); |
| serializedBytes = baos.toByteArray(); |
| } catch (IOException e) { |
| Assert.fail("Can not serialize data.", e); |
| } |
| |
| return serializedBytes; |
| } |
| } |