blob: c7b5cbfd0d0377f36672509995447531e41977fd [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestZkBucketDataAccessor extends ZkTestBase {
private static final String PATH = "/" + TestHelper.getTestClassName();
private static final String NAME_KEY = TestHelper.getTestClassName();
private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
private static final String LAST_WRITE_KEY = "LAST_WRITE";
// Populate list and map fields for content comparison
private static final List<String> LIST_FIELD = ImmutableList.of("1", "2");
private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2");
private BucketDataAccessor _bucketDataAccessor;
private BaseDataAccessor<byte[]> _zkBaseDataAccessor;
private ZNRecord record = new ZNRecord(NAME_KEY);
@BeforeClass
public void beforeClass() {
// Initialize ZK accessors for testing
_bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR, 50 * 1024, 0L);
HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof byte[]) {
return (byte[]) data;
}
throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
}
@Override
public Object deserialize(byte[] data) throws ZkMarshallingError {
return data;
}
});
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
// Fill in some data for the record
record.setSimpleField(NAME_KEY, NAME_KEY);
record.setListField(NAME_KEY, LIST_FIELD);
record.setMapField(NAME_KEY, MAP_FIELD);
}
@AfterClass
public void afterClass() {
_bucketDataAccessor.disconnect();
}
/**
* Attempt writing a simple HelixProperty using compressedBucketWrite.
* @throws IOException
*/
@Test
public void testCompressedBucketWrite() throws IOException {
Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
}
@Test(dependsOnMethods = "testCompressedBucketWrite")
public void testMultipleWrites() throws Exception {
int count = 50;
// Write "count" times
for (int i = 0; i < count; i++) {
_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
}
// Last known good version number should be "count"
byte[] binarySuccessfulWriteVer = _zkBaseDataAccessor
.get(PATH + "/" + LAST_SUCCESSFUL_WRITE_KEY, null, AccessOption.PERSISTENT);
long lastSuccessfulWriteVer = Long.parseLong(new String(binarySuccessfulWriteVer));
Assert.assertEquals(lastSuccessfulWriteVer, count);
// Last write version should be "count"
byte[] binaryWriteVer =
_zkBaseDataAccessor.get(PATH + "/" + LAST_WRITE_KEY, null, AccessOption.PERSISTENT);
long writeVer = Long.parseLong(new String(binaryWriteVer));
Assert.assertEquals(writeVer, count);
// Test that all previous versions have been deleted
// Use Verifier because GC can take ZK delay
Assert.assertTrue(TestHelper.verify(() -> {
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
return children.size() == 3;
}, 60 * 1000L));
}
/**
* The record written in {@link #testCompressedBucketWrite()} is the same record that was written.
*/
@Test(dependsOnMethods = "testMultipleWrites")
public void testCompressedBucketRead() {
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);
Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD);
_bucketDataAccessor.compressedBucketDelete(PATH);
}
/**
* Write a HelixProperty with large number of entries using BucketDataAccessor and read it back.
*/
@Test(dependsOnMethods = "testCompressedBucketRead")
public void testLargeWriteAndRead() throws IOException {
String name = "largeResourceAssignment";
HelixProperty property = createLargeHelixProperty(name, 100000);
// Perform large write
long before = System.currentTimeMillis();
_bucketDataAccessor.compressedBucketWrite("/" + name, property);
long after = System.currentTimeMillis();
System.out.println("Write took " + (after - before) + " ms");
// Read it back
before = System.currentTimeMillis();
HelixProperty readRecord =
_bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class);
after = System.currentTimeMillis();
System.out.println("Read took " + (after - before) + " ms");
// Check against the original HelixProperty
Assert.assertEquals(readRecord, property);
}
private HelixProperty createLargeHelixProperty(String name, int numEntries) {
HelixProperty property = new HelixProperty(name);
for (int i = 0; i < numEntries; i++) {
// Create a random string every time
byte[] arrayKey = new byte[20];
byte[] arrayVal = new byte[20];
new Random().nextBytes(arrayKey);
new Random().nextBytes(arrayVal);
String randomStrKey = new String(arrayKey, StandardCharsets.UTF_8);
String randomStrVal = new String(arrayVal, StandardCharsets.UTF_8);
// Dummy mapField
Map<String, String> mapField = new HashMap<>();
mapField.put(randomStrKey, randomStrVal);
property.getRecord().setMapField(randomStrKey, mapField);
}
return property;
}
}