blob: f2ba28c3e20ded06c671786165a56c049b1db09d [file] [log] [blame]
package org.apache.helix.controller.rebalancer.constraint.dataprovider;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
/**
* A capacity provider based on ZK node.
* This class support persistent through Helix Property Store.
*/
public class ZkBasedCapacityProvider implements CapacityProvider {
public static final int DEFAULT_CAPACITY_VALUE = 0;
private static final String ROOT = "/PARTICIPANT_CAPACITY";
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final String _dimensionPath;
private ParticipantCapacity _capacity;
/**
* @param propertyStore The store that will be used to persist capacity information.
* @param dimensionName Identify of the capacity attribute. For example memory, CPU.
*/
public ZkBasedCapacityProvider(ZkHelixPropertyStore<ZNRecord> propertyStore,
String dimensionName) {
_propertyStore = propertyStore;
_dimensionPath = ROOT + "/" + dimensionName;
ZNRecord existingRecord = _propertyStore.get(_dimensionPath, null, AccessOption.PERSISTENT);
if (existingRecord == null) {
// Create a capacity object using default capacity (DEFAULT_CAPACITY_VALUE).
_capacity = new ParticipantCapacity(dimensionName);
} else {
_capacity = new ParticipantCapacity(existingRecord);
}
}
/**
* @param zkAddr
* @param clusterName
* @param dimensionName Identify of the capacity attribute. For example memory, CPU.
* Need to match resource weight dimension.
*/
public ZkBasedCapacityProvider(String zkAddr, String clusterName, String dimensionName) {
this(new ZkHelixPropertyStore<ZNRecord>(zkAddr, new ZNRecordSerializer(),
PropertyPathBuilder.propertyStore(clusterName)), dimensionName);
}
/**
* Update capacity information.
*
* @param capacityMap <ParticipantName, Total Participant Capacity>
* @param usageMap <ParticipantName, Usage>
* @param defaultCapacity Default total capacity if not specified in the map
*/
public void updateCapacity(Map<String, Integer> capacityMap, Map<String, Integer> usageMap,
int defaultCapacity) {
for (String participant : capacityMap.keySet()) {
_capacity.setCapacity(participant, capacityMap.get(participant));
}
for (String participant : usageMap.keySet()) {
_capacity.setUsage(participant, usageMap.get(participant));
}
_capacity.setDefaultCapacity(defaultCapacity);
}
/**
* @return True if the capacity information is successfully wrote to ZK.
*/
public boolean persistCapacity() {
if (_capacity.isValid()) {
return _propertyStore.set(_dimensionPath, _capacity.getRecord(), AccessOption.PERSISTENT);
} else {
throw new HelixException("Invalid ParticipantCapacity: " + _capacity.getRecord().toString());
}
}
@Override
public int getParticipantCapacity(String participant) {
return _capacity.getCapacity(participant);
}
@Override
public int getParticipantUsage(String participant) {
return _capacity.getUsage(participant);
}
/**
* Data model for participant capacity.
* Per-participant capacity and usage are recorded in the mapfields.
*/
private static class ParticipantCapacity extends HelixProperty {
private static final String CAPACITY = "CAPACITY";
private static final String USAGE = "USAGE_SIZE";
enum ParticipantCapacityProperty {
DEFAULT_CAPACITY,
}
ParticipantCapacity(String dimensionName) {
super(dimensionName);
_record
.setIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), DEFAULT_CAPACITY_VALUE);
}
ParticipantCapacity(ZNRecord record) {
super(record);
if (!isValid()) {
throw new HelixException("Invalid ParticipantCapacity: " + record.toString());
}
}
int getCapacity(String participant) {
Map<String, String> participantMap = _record.getMapField(participant);
if (participantMap != null && participantMap.containsKey(CAPACITY)) {
return Integer.parseInt(participantMap.get(CAPACITY));
}
return getDefaultCapacity();
}
int getUsage(String participant) {
Map<String, String> participantMap = _record.getMapField(participant);
if (participantMap != null && participantMap.containsKey(USAGE)) {
return Integer.parseInt(participantMap.get(USAGE));
}
return 0;
}
void setCapacity(String participant, int capacity) {
Map<String, String> participantMap = getOrAddParticipantMap(participant);
participantMap.put(CAPACITY, new Integer(capacity).toString());
}
void setUsage(String participant, int usage) {
Map<String, String> participantMap = getOrAddParticipantMap(participant);
participantMap.put(USAGE, new Integer(usage).toString());
}
private Map<String, String> getOrAddParticipantMap(String participant) {
Map<String, String> participantMap = _record.getMapField(participant);
if (participantMap == null) {
participantMap = new HashMap<>();
_record.setMapField(participant, participantMap);
}
return participantMap;
}
void setDefaultCapacity(int defaultCapacity) {
_record.setIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), defaultCapacity);
}
private int getDefaultCapacity() {
return _record
.getIntField(ParticipantCapacityProperty.DEFAULT_CAPACITY.name(), DEFAULT_CAPACITY_VALUE);
}
@Override
public boolean isValid() {
try {
// check default capacity
int defaultCapacity = getDefaultCapacity();
if (defaultCapacity < 0) {
return false;
}
// check if any invalid capacity values
for (Map<String, String> capacityRecords : _record.getMapFields().values()) {
if ((capacityRecords.containsKey(CAPACITY)
&& Integer.parseInt(capacityRecords.get(CAPACITY)) < 0) || (
capacityRecords.containsKey(USAGE)
&& Integer.parseInt(capacityRecords.get(USAGE)) < 0)) {
return false;
}
}
return true;
} catch (Exception ex) {
return false;
}
}
}
}