blob: 51f16ac08b01a00c3933be99206fb0b0ec8d2de4 [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.GroupCommit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordAssembler;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordUpdater;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.data.Stat;
public class ZKHelixDataAccessor implements HelixDataAccessor {
private static Logger LOG = LoggerFactory.getLogger(ZKHelixDataAccessor.class);
private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
final InstanceType _instanceType;
private final String _clusterName;
private final Builder _propertyKeyBuilder;
private final GroupCommit _groupCommit = new GroupCommit();
public ZKHelixDataAccessor(String clusterName, BaseDataAccessor<ZNRecord> baseDataAccessor) {
this(clusterName, null, baseDataAccessor);
}
public ZKHelixDataAccessor(String clusterName, InstanceType instanceType,
BaseDataAccessor<ZNRecord> baseDataAccessor) {
_clusterName = clusterName;
_instanceType = instanceType;
_baseDataAccessor = baseDataAccessor;
_propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
}
/* Copy constructor */
public ZKHelixDataAccessor(ZKHelixDataAccessor dataAccessor) {
_clusterName = dataAccessor._clusterName;
_instanceType = dataAccessor._instanceType;
_baseDataAccessor = dataAccessor._baseDataAccessor;
_propertyKeyBuilder = new PropertyKey.Builder(_clusterName);
}
@Override
public boolean createStateModelDef(StateModelDefinition stateModelDef) {
String path = PropertyPathBuilder.stateModelDef(_clusterName, stateModelDef.getId());
HelixProperty property =
getProperty(new PropertyKey.Builder(_clusterName).stateModelDef(stateModelDef.getId()));
// Set new StateModelDefinition if it is different from old one.
if (property != null) {
// StateModelDefinition need to be updated
if (!new StateModelDefinition(property.getRecord()).equals(stateModelDef)) {
return stateModelDef.isValid() && _baseDataAccessor
.set(path, stateModelDef.getRecord(), AccessOption.PERSISTENT);
}
} else {
// StateModeDefinition does not exist
return stateModelDef.isValid() && _baseDataAccessor
.create(path, stateModelDef.getRecord(), AccessOption.PERSISTENT);
}
// StateModelDefinition exists but not need to be updated
return true;
}
@Override
public boolean createControllerMessage(Message message) {
return _baseDataAccessor.create(PropertyPathBuilder.controllerMessage(_clusterName,
message.getMsgId()),
message.getRecord(), AccessOption.PERSISTENT);
}
@Override
public boolean createControllerLeader(LiveInstance leader) {
return _baseDataAccessor.create(PropertyPathBuilder.controllerLeader(_clusterName), leader.getRecord(),
AccessOption.EPHEMERAL);
}
@Override
public boolean createPause(PauseSignal pauseSignal) {
return _baseDataAccessor.create(
PropertyPathBuilder.pause(_clusterName), pauseSignal.getRecord(), AccessOption.PERSISTENT);
}
@Override
public boolean createMaintenance(MaintenanceSignal maintenanceSignal) {
return _baseDataAccessor.set(PropertyPathBuilder.maintenance(_clusterName),
maintenanceSignal.getRecord(), AccessOption.PERSISTENT);
}
@Override
public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value) {
PropertyType type = key.getType();
if (!value.isValid()) {
throw new HelixMetaDataAccessException("The ZNRecord for " + type + " is not valid.");
}
String path = key.getPath();
int options = constructOptions(type);
boolean success = false;
switch (type) {
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
if (value.getBucketSize() > 0) {
// set parent node
ZNRecord metaRecord = new ZNRecord(value.getId());
metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
success = _baseDataAccessor.set(path, metaRecord, options);
if (success) {
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
List<String> paths = new ArrayList<String>();
List<ZNRecord> bucketizedRecords = new ArrayList<ZNRecord>();
for (String bucketName : map.keySet()) {
paths.add(path + "/" + bucketName);
bucketizedRecords.add(map.get(bucketName));
}
// TODO: set success accordingly
_baseDataAccessor.setChildren(paths, bucketizedRecords, options);
}
} else {
success = _baseDataAccessor.set(path, value.getRecord(), options);
}
break;
default:
success = _baseDataAccessor.set(path, value.getRecord(), options);
break;
}
return success;
}
@Override
public <T extends HelixProperty> boolean updateProperty(PropertyKey key, T value) {
return updateProperty(key, new ZNRecordUpdater(value.getRecord()), value);
}
@Override
public <T extends HelixProperty> boolean updateProperty(PropertyKey key, DataUpdater<ZNRecord> updater, T value) {
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
boolean success = false;
switch (type) {
case CURRENTSTATES:
success = _groupCommit.commit(_baseDataAccessor, options, path, value.getRecord(), true);
break;
case STATUSUPDATES:
if (LOG.isTraceEnabled()) {
LOG.trace("Update status. path: " + key.getPath() + ", record: " + value.getRecord());
}
break;
default:
success = _baseDataAccessor.update(path, updater, options);
break;
}
return success;
}
@Deprecated
@Override
public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) {
return getProperty(keys, false);
}
@Override
public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
boolean throwException) throws HelixMetaDataAccessException {
if (keys == null || keys.size() == 0) {
return Collections.emptyList();
}
List<T> childValues = new ArrayList<T>();
// read all records
List<String> paths = new ArrayList<>();
List<Stat> stats = new ArrayList<>();
for (PropertyKey key : keys) {
paths.add(key.getPath());
stats.add(new Stat());
}
List<ZNRecord> children = _baseDataAccessor.get(paths, stats, 0, throwException);
// check if bucketized
for (int i = 0; i < keys.size(); i++) {
PropertyKey key = keys.get(i);
ZNRecord record = children.get(i);
Stat stat = stats.get(i);
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
if (record != null) {
record.setCreationTime(stat.getCtime());
record.setModifiedTime(stat.getMtime());
record.setVersion(stat.getVersion());
record.setEphemeralOwner(stat.getEphemeralOwner());
}
switch (type) {
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
if (record != null) {
HelixProperty property = new HelixProperty(record);
int bucketSize = property.getBucketSize();
if (bucketSize > 0) {
// @see HELIX-574
// clean up list and map fields in case we write to parent node by mistake
property.getRecord().getMapFields().clear();
property.getRecord().getListFields().clear();
List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options, 1, 0);
ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
// merge with parent node value
if (assembledRecord != null) {
record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
record.getListFields().putAll(assembledRecord.getListFields());
record.getMapFields().putAll(assembledRecord.getMapFields());
}
}
}
break;
default:
break;
}
@SuppressWarnings("unchecked")
T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
childValues.add(t);
}
return childValues;
}
@Override
public <T extends HelixProperty> T getProperty(PropertyKey key) {
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
ZNRecord record = null;
try {
Stat stat = new Stat();
record = _baseDataAccessor.get(path, stat, options);
if (record != null) {
record.setCreationTime(stat.getCtime());
record.setModifiedTime(stat.getMtime());
record.setVersion(stat.getVersion());
record.setEphemeralOwner(stat.getEphemeralOwner());
}
} catch (ZkNoNodeException e) {
// OK
}
switch (type) {
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
// check if bucketized
if (record != null) {
HelixProperty property = new HelixProperty(record);
int bucketSize = property.getBucketSize();
if (bucketSize > 0) {
// @see HELIX-574
// clean up list and map fields in case we write to parent node by mistake
property.getRecord().getMapFields().clear();
property.getRecord().getListFields().clear();
List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options);
ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
// merge with parent node value
if (assembledRecord != null) {
record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
record.getListFields().putAll(assembledRecord.getListFields());
record.getMapFields().putAll(assembledRecord.getMapFields());
}
}
}
break;
default:
break;
}
@SuppressWarnings("unchecked")
T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
return t;
}
@Override
public HelixProperty.Stat getPropertyStat(PropertyKey key) {
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
try {
Stat stat = _baseDataAccessor.getStat(path, options);
if (stat != null) {
return new HelixProperty.Stat(stat.getVersion(), stat.getCtime(), stat.getMtime(), stat.getEphemeralOwner());
}
} catch (ZkNoNodeException e) {
}
return null;
}
@Override
public List<HelixProperty.Stat> getPropertyStats(List<PropertyKey> keys) {
if (keys == null || keys.size() == 0) {
return Collections.emptyList();
}
List<HelixProperty.Stat> propertyStats = new ArrayList<>(keys.size());
List<String> paths = new ArrayList<>(keys.size());
for (PropertyKey key : keys) {
paths.add(key.getPath());
}
Stat[] zkStats = _baseDataAccessor.getStats(paths, 0);
for (int i = 0; i < keys.size(); i++) {
Stat zkStat = zkStats[i];
HelixProperty.Stat propertyStat = null;
if (zkStat != null) {
propertyStat =
new HelixProperty.Stat(zkStat.getVersion(), zkStat.getCtime(), zkStat.getMtime(), zkStat.getEphemeralOwner());
}
propertyStats.add(propertyStat);
}
return propertyStats;
}
@Override
public boolean removeProperty(PropertyKey key) {
PropertyType type = key.getType();
String path = key.getPath();
int options = constructOptions(type);
return _baseDataAccessor.remove(path, options);
}
@Override
public List<String> getChildNames(PropertyKey key) {
PropertyType type = key.getType();
String parentPath = key.getPath();
int options = constructOptions(type);
List<String> childNames = _baseDataAccessor.getChildNames(parentPath, options);
if (childNames == null) {
childNames = Collections.emptyList();
}
return childNames;
}
@Deprecated
@Override
public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) {
return getChildValues(key, false);
}
@Override
public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, boolean throwException) {
PropertyType type = key.getType();
String parentPath = key.getPath();
int options = constructOptions(type);
List<T> childValues = new ArrayList<T>();
List<ZNRecord> children;
if (throwException) {
children = _baseDataAccessor.getChildren(parentPath, null, options, 1, 0);
} else {
children = _baseDataAccessor.getChildren(parentPath, null, options);
}
if (children != null) {
for (ZNRecord record : children) {
switch (type) {
case CURRENTSTATES:
case IDEALSTATES:
case EXTERNALVIEW:
if (record != null) {
HelixProperty property = new HelixProperty(record);
int bucketSize = property.getBucketSize();
if (bucketSize > 0) {
// TODO: fix this if record.id != pathName
String childPath = parentPath + "/" + record.getId();
List<ZNRecord> childRecords;
if (throwException) {
childRecords = _baseDataAccessor.getChildren(childPath, null, options, 1, 0);
} else {
childRecords = _baseDataAccessor.getChildren(childPath, null, options);
}
ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords);
// merge with parent node value
if (assembledRecord != null) {
record.getSimpleFields().putAll(assembledRecord.getSimpleFields());
record.getListFields().putAll(assembledRecord.getListFields());
record.getMapFields().putAll(assembledRecord.getMapFields());
}
}
}
break;
default:
break;
}
if (record != null) {
@SuppressWarnings("unchecked")
T t = (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
childValues.add(t);
}
}
}
return childValues;
}
@Deprecated
@Override
public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) {
return getChildValuesMap(key, false);
}
@Override
public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key,
boolean throwException) {
PropertyType type = key.getType();
String parentPath = key.getPath();
int options = constructOptions(type);
List<T> children = getChildValues(key, throwException);
Map<String, T> childValuesMap = new HashMap<String, T>();
for (T t : children) {
childValuesMap.put(t.getRecord().getId(), t);
}
return childValuesMap;
}
@Override
public Builder keyBuilder() {
return _propertyKeyBuilder;
}
private int constructOptions(PropertyType type) {
int options = 0;
if (type.isPersistent()) {
options = options | AccessOption.PERSISTENT;
} else {
options = options | AccessOption.EPHEMERAL;
}
return options;
}
@Override
public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys, List<T> children) {
// TODO: add validation
int options = -1;
List<String> paths = new ArrayList<String>();
List<ZNRecord> records = new ArrayList<ZNRecord>();
for (int i = 0; i < keys.size(); i++) {
PropertyKey key = keys.get(i);
PropertyType type = key.getType();
String path = key.getPath();
paths.add(path);
HelixProperty value = children.get(i);
records.add(value.getRecord());
options = constructOptions(type);
}
return _baseDataAccessor.createChildren(paths, records, options);
}
@Override
public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys, List<T> children) {
int options = -1;
List<String> paths = new ArrayList<String>();
List<ZNRecord> records = new ArrayList<ZNRecord>();
List<List<String>> bucketizedPaths =
new ArrayList<List<String>>(Collections.<List<String>> nCopies(keys.size(), null));
List<List<ZNRecord>> bucketizedRecords =
new ArrayList<List<ZNRecord>>(Collections.<List<ZNRecord>> nCopies(keys.size(), null));
for (int i = 0; i < keys.size(); i++) {
PropertyKey key = keys.get(i);
PropertyType type = key.getType();
String path = key.getPath();
paths.add(path);
options = constructOptions(type);
HelixProperty value = children.get(i);
switch (type) {
case EXTERNALVIEW:
if (value.getBucketSize() == 0) {
records.add(value.getRecord());
} else {
ZNRecord metaRecord = new ZNRecord(value.getId());
metaRecord.setSimpleFields(value.getRecord().getSimpleFields());
records.add(metaRecord);
ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(value.getBucketSize());
Map<String, ZNRecord> map = bucketizer.bucketize(value.getRecord());
List<String> childBucketizedPaths = new ArrayList<String>();
List<ZNRecord> childBucketizedRecords = new ArrayList<ZNRecord>();
for (String bucketName : map.keySet()) {
childBucketizedPaths.add(path + "/" + bucketName);
childBucketizedRecords.add(map.get(bucketName));
}
bucketizedPaths.set(i, childBucketizedPaths);
bucketizedRecords.set(i, childBucketizedRecords);
}
break;
case STATEMODELDEFS:
if (value.isValid()) {
records.add(value.getRecord());
}
break;
default:
records.add(value.getRecord());
break;
}
}
// set non-bucketized nodes or parent nodes of bucketized nodes
boolean success[] = _baseDataAccessor.setChildren(paths, records, options);
// set bucketized nodes
List<String> allBucketizedPaths = new ArrayList<String>();
List<ZNRecord> allBucketizedRecords = new ArrayList<ZNRecord>();
for (int i = 0; i < keys.size(); i++) {
if (success[i] && bucketizedPaths.get(i) != null) {
allBucketizedPaths.addAll(bucketizedPaths.get(i));
allBucketizedRecords.addAll(bucketizedRecords.get(i));
}
}
// TODO: set success accordingly
_baseDataAccessor.setChildren(allBucketizedPaths, allBucketizedRecords, options);
return success;
}
@Override
public BaseDataAccessor<ZNRecord> getBaseDataAccessor() {
return _baseDataAccessor;
}
@Override
public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
List<DataUpdater<ZNRecord>> updaters, int options) {
return _baseDataAccessor.updateChildren(paths, updaters, options);
}
}