blob: bc13471a58c7a006f28fed97fbb0b3914f2f481f [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.util.GZipCompressionUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class);
private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; // 50KB
private static final long DEFAULT_VERSION_TTL = TimeUnit.MINUTES.toMillis(1L); // 1 min
private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE";
private static final String DATA_SIZE_KEY = "DATA_SIZE";
private static final String METADATA_KEY = "METADATA";
private static final String LAST_SUCCESSFUL_WRITE_KEY = "LAST_SUCCESSFUL_WRITE";
private static final String LAST_WRITE_KEY = "LAST_WRITE";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
// Thread pool for deleting stale versions
private static final ScheduledExecutorService GC_THREAD = Executors.newScheduledThreadPool(1);
private final int _bucketSize;
private final long _versionTTL;
private ZkSerializer _zkSerializer;
private HelixZkClient _zkClient;
private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
/**
* Constructor that allows a custom bucket size.
* @param zkAddr
* @param bucketSize
* @param versionTTL in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
_zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
_zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof byte[]) {
return (byte[]) data;
}
throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!");
}
@Override
public Object deserialize(byte[] data) throws ZkMarshallingError {
return data;
}
});
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTL = versionTTL;
}
/**
* Constructor that uses a default bucket size.
* @param zkAddr
*/
public ZkBucketDataAccessor(String zkAddr) {
this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL);
}
@Override
public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath, T value)
throws IOException {
DataUpdater<byte[]> lastWriteVersionUpdater = dataInZk -> {
if (dataInZk == null || dataInZk.length == 0) {
// No last write version exists, so start with 0
return "0".getBytes();
}
// Last write exists, so increment and write it back
// **String conversion is necessary to make it display in ZK (zooinspector)**
String lastWriteVersionStr = new String(dataInZk);
long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
lastWriteVersion++;
return String.valueOf(lastWriteVersion).getBytes();
};
// 1. Increment lastWriteVersion using DataUpdater
ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
throw new HelixException(
String.format("Failed to write the write version at path: %s!", rootPath));
}
// Successfully reserved a version number
byte[] binaryVersion = (byte[]) result._updatedValue;
String versionStr = new String(binaryVersion);
final long version = Long.parseLong(versionStr);
// 2. Write to the incremented last write version
String versionedDataPath = rootPath + "/" + versionStr;
// Take the ZNRecord and serialize it (get byte[])
byte[] serializedRecord = _zkSerializer.serialize(value.getRecord());
// Compress the byte[]
byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord);
// Compute N - number of buckets
int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize;
List<String> paths = new ArrayList<>();
List<byte[]> buckets = new ArrayList<>();
int ptr = 0;
int counter = 0;
while (counter < numBuckets) {
paths.add(versionedDataPath + "/" + counter);
if (counter == numBuckets - 1) {
// Special treatment for the last bucket
buckets.add(
Arrays.copyOfRange(compressedRecord, ptr, ptr + compressedRecord.length % _bucketSize));
} else {
buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize));
}
ptr += _bucketSize;
counter++;
}
// 3. Include the metadata in the batch write
Map<String, String> metadata = ImmutableMap.of(BUCKET_SIZE_KEY, Integer.toString(_bucketSize),
DATA_SIZE_KEY, Integer.toString(compressedRecord.length));
byte[] binaryMetadata = OBJECT_MAPPER.writeValueAsBytes(metadata);
paths.add(versionedDataPath + "/" + METADATA_KEY);
buckets.add(binaryMetadata);
// Do an async set to ZK
boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
// Exception and fail the write if any failed
for (boolean s : success) {
if (!s) {
throw new HelixException(
String.format("Failed to write the data buckets for path: %s", rootPath));
}
}
// 4. Update lastSuccessfulWriteVersion using Updater
DataUpdater<byte[]> lastSuccessfulWriteVersionUpdater = dataInZk -> {
if (dataInZk == null || dataInZk.length == 0) {
// No last write version exists, so write version from this write
return versionStr.getBytes();
}
// Last successful write exists so check if it's smaller than my number
String lastWriteVersionStr = new String(dataInZk);
long lastWriteVersion = Long.parseLong(lastWriteVersionStr);
if (lastWriteVersion < version) {
// Smaller, so I can overwrite
return versionStr.getBytes();
} else {
// Greater, I have lagged behind. Return null and do not write
return null;
}
};
if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY,
lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
throw new HelixException(String
.format("Failed to write the last successful write metadata at path: %s!", rootPath));
}
// 5. Update the timer for GC
updateGCTimer(rootPath, versionStr);
return true;
}
@Override
public <T extends HelixProperty> HelixProperty compressedBucketRead(String path,
Class<T> helixPropertySubType) {
return helixPropertySubType.cast(compressedBucketRead(path));
}
@Override
public void compressedBucketDelete(String path) {
if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
}
}
@Override
public void disconnect() {
if (!_zkClient.isClosed()) {
_zkClient.close();
}
}
private HelixProperty compressedBucketRead(String path) {
// 1. Get the version to read
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
}
String versionToRead = new String(binaryVersionToRead);
// 2. Get the metadata map
byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
null, AccessOption.PERSISTENT);
if (binaryMetadata == null) {
throw new ZkNoNodeException(
String.format("Metadata ZNode does not exist for path: %s", path));
}
Map metadata;
try {
metadata = OBJECT_MAPPER.readValue(binaryMetadata, Map.class);
} catch (IOException e) {
throw new HelixException(String.format("Failed to deserialize path metadata: %s!", path), e);
}
// 3. Read the data
Object bucketSizeObj = metadata.get(BUCKET_SIZE_KEY);
Object dataSizeObj = metadata.get(DATA_SIZE_KEY);
if (bucketSizeObj == null) {
throw new HelixException(
String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path));
}
if (dataSizeObj == null) {
throw new HelixException(
String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path));
}
int bucketSize = Integer.parseInt((String) bucketSizeObj);
int dataSize = Integer.parseInt((String) dataSizeObj);
// Compute N - number of buckets
int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize;
byte[] compressedRecord = new byte[dataSize];
String dataPath = path + "/" + versionToRead;
List<String> paths = new ArrayList<>();
for (int i = 0; i < numBuckets; i++) {
paths.add(dataPath + "/" + i);
}
// Async get
List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
// Combine buckets into one byte array
int copyPtr = 0;
for (int i = 0; i < numBuckets; i++) {
if (i == numBuckets - 1) {
// Special treatment for the last bucket
System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize);
} else {
System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize);
copyPtr += bucketSize;
}
}
// Decompress the byte array
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord);
byte[] serializedRecord;
try {
serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream);
} catch (IOException e) {
throw new HelixException(String.format("Failed to decompress path: %s!", path), e);
}
// Deserialize the record to retrieve the original
ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord);
return new HelixProperty(originalRecord);
}
@Override
public void close() {
disconnect();
}
private void updateGCTimer(String rootPath, String currentVersion) {
TimerTask gcTask = new TimerTask() {
@Override
public void run() {
deleteStaleVersions(rootPath, currentVersion);
}
};
// Schedule the gc task with TTL
GC_THREAD.schedule(gcTask, _versionTTL, TimeUnit.MILLISECONDS);
}
/**
* Deletes all stale versions.
* @param rootPath
* @param currentVersion
*/
private void deleteStaleVersions(String rootPath, String currentVersion) {
// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
}
filterChildrenNames(children, currentVersion);
List<String> pathsToDelete = getPathsToDelete(rootPath, children);
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
}
}
/**
* Filter out non-version children names and non-stale versions.
* @param children
*/
private void filterChildrenNames(List<String> children, String currentVersion) {
// Leave out metadata
children.remove(LAST_SUCCESSFUL_WRITE_KEY);
children.remove(LAST_WRITE_KEY);
// Leave out currentVersion and above
// This is because we want to honor the TTL for newer versions
children.remove(currentVersion);
long currentVer = Long.parseLong(currentVersion);
for (String child : children) {
try {
long version = Long.parseLong(child);
if (version >= currentVer) {
children.remove(child);
}
} catch (Exception e) {
// Ignore ZNode names that aren't parseable
children.remove(child);
LOG.debug("Found an invalid ZNode: {}", child);
}
}
}
/**
* Generates all stale paths to delete.
* @param path
* @param staleVersions
* @return
*/
private List<String> getPathsToDelete(String path, List<String> staleVersions) {
List<String> pathsToDelete = new ArrayList<>();
staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver));
return pathsToDelete;
}
}