| package org.apache.helix; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeMap; |
| |
| import org.apache.helix.ZNRecordDelta.MergeOperation; |
| import org.apache.helix.manager.zk.serializer.JacksonPayloadSerializer; |
| import org.apache.helix.manager.zk.serializer.PayloadSerializer; |
| import org.codehaus.jackson.annotate.JsonCreator; |
| import org.codehaus.jackson.annotate.JsonIgnore; |
| import org.codehaus.jackson.annotate.JsonIgnoreProperties; |
| import org.codehaus.jackson.annotate.JsonProperty; |
| import org.codehaus.jackson.map.annotate.JsonSerialize; |
| import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Generic Record Format to store data at a Node This can be used to store |
| * simpleFields mapFields listFields |
| */ |
| @JsonIgnoreProperties(ignoreUnknown = true) |
| @JsonSerialize(include = Inclusion.NON_NULL) |
| public class ZNRecord { |
| static Logger _logger = LoggerFactory.getLogger(ZNRecord.class); |
| private final String id; |
| |
| @JsonIgnore(true) |
| public static final String LIST_FIELD_BOUND = "listField.bound"; |
| |
| @JsonIgnore(true) |
| public static final int SIZE_LIMIT = 1000 * 1024; // leave a margin out of 1M |
| |
| // We don't want the _deltaList to be serialized and deserialized |
| private List<ZNRecordDelta> _deltaList = new ArrayList<ZNRecordDelta>(); |
| |
| private Map<String, String> simpleFields; |
| private Map<String, Map<String, String>> mapFields; |
| private Map<String, List<String>> listFields; |
| private byte[] rawPayload; |
| |
| private PayloadSerializer _serializer; |
| |
| // the version field of zookeeper Stat |
| private int _version; |
| |
| private long _creationTime; |
| |
| private long _modifiedTime; |
| |
| private long _ephemeralOwner; |
| |
| /** |
| * Initialize with an identifier |
| * @param id |
| */ |
| @JsonCreator |
| public ZNRecord(@JsonProperty("id") String id) { |
| this.id = id; |
| simpleFields = new TreeMap<>(); |
| mapFields = new TreeMap<>(); |
| listFields = new TreeMap<>(); |
| rawPayload = null; |
| _serializer = new JacksonPayloadSerializer(); |
| } |
| |
| /** |
| * Initialize with a pre-populated ZNRecord |
| * @param record |
| */ |
| public ZNRecord(ZNRecord record) { |
| this(record, record.getId()); |
| } |
| |
| /** |
| * Initialize with a pre-populated ZNRecord, overwriting the identifier |
| * @param record |
| * @param id |
| */ |
| public ZNRecord(ZNRecord record, String id) { |
| this(id); |
| simpleFields.putAll(record.getSimpleFields()); |
| mapFields.putAll(record.getMapFields()); |
| listFields.putAll(record.getListFields()); |
| if (record.rawPayload != null) { |
| rawPayload = new byte[record.rawPayload.length]; |
| System.arraycopy(record.rawPayload, 0, rawPayload, 0, record.rawPayload.length); |
| } else { |
| rawPayload = null; |
| } |
| _version = record.getVersion(); |
| _creationTime = record.getCreationTime(); |
| _modifiedTime = record.getModifiedTime(); |
| _ephemeralOwner = record.getEphemeralOwner(); |
| } |
| |
| /** |
| * Set a custom {@link PayloadSerializer} to allow including arbitrary data |
| * @param serializer |
| */ |
| @JsonIgnore(true) |
| public void setPayloadSerializer(PayloadSerializer serializer) { |
| _serializer = serializer; |
| } |
| |
| /** |
| * Set the list of updates to this ZNRecord |
| * @param deltaList |
| */ |
| @JsonIgnore(true) |
| public void setDeltaList(List<ZNRecordDelta> deltaList) { |
| _deltaList = deltaList; |
| } |
| |
| /** |
| * Get the list of updates to this ZNRecord |
| * @return list of {@link ZNRecordDelta} |
| */ |
| @JsonIgnore(true) |
| public List<ZNRecordDelta> getDeltaList() { |
| return _deltaList; |
| } |
| |
| /** |
| * Get all plain key, value fields |
| * @return Map of simple fields |
| */ |
| @JsonProperty |
| public Map<String, String> getSimpleFields() { |
| return simpleFields; |
| } |
| |
| /** |
| * Set all plain key, value fields |
| * @param simpleFields |
| */ |
| @JsonProperty |
| public void setSimpleFields(Map<String, String> simpleFields) { |
| this.simpleFields = simpleFields; |
| } |
| |
| /** |
| * Get all fields whose values are key, value properties |
| * @return all map fields |
| */ |
| @JsonProperty |
| public Map<String, Map<String, String>> getMapFields() { |
| return mapFields; |
| } |
| |
| /** |
| * Set all fields whose values are key, value properties |
| * @param mapFields |
| */ |
| @JsonProperty |
| public void setMapFields(Map<String, Map<String, String>> mapFields) { |
| this.mapFields = mapFields; |
| } |
| |
| /** |
| * Get all fields whose values are a list of values |
| * @return all list fields |
| */ |
| @JsonProperty |
| public Map<String, List<String>> getListFields() { |
| return listFields; |
| } |
| |
| /** |
| * Set all fields whose values are a list of values |
| * @param listFields |
| */ |
| @JsonProperty |
| public void setListFields(Map<String, List<String>> listFields) { |
| this.listFields = listFields; |
| } |
| |
| /** |
| * Set a simple key, value field |
| * @param k |
| * @param v |
| */ |
| @JsonProperty |
| public void setSimpleField(String k, String v) { |
| simpleFields.put(k, v); |
| } |
| |
| @JsonProperty |
| public String getId() { |
| return id; |
| } |
| |
| /** |
| * Set arbitrary data serialized as a byte array payload. Consider using |
| * {@link #setPayload(Object)} instead |
| * @param payload |
| */ |
| @JsonProperty |
| public void setRawPayload(byte[] payload) { |
| rawPayload = payload; |
| } |
| |
| /** |
| * Get arbitrary data serialized as a byte array payload. Consider using |
| * {@link #getPayload(Class)} instead |
| * @return |
| */ |
| @JsonProperty |
| public byte[] getRawPayload() { |
| return rawPayload; |
| } |
| |
| /** |
| * Set a typed payload that will be serialized and persisted. |
| * @param payload |
| */ |
| @JsonIgnore(true) |
| public <T> void setPayload(T payload) { |
| if (_serializer != null && payload != null) { |
| rawPayload = _serializer.serialize(payload); |
| } else { |
| rawPayload = null; |
| } |
| } |
| |
| /** |
| * Get a typed deserialized payload |
| * @param clazz |
| * @return |
| */ |
| @JsonIgnore(true) |
| public <T> T getPayload(Class<T> clazz) { |
| if (_serializer != null && rawPayload != null) { |
| return _serializer.deserialize(clazz, rawPayload); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Set a single String --> Map field |
| * @param k |
| * @param v |
| */ |
| public void setMapField(String k, Map<String, String> v) { |
| mapFields.put(k, v); |
| } |
| |
| /** |
| * Set a single String --> List field |
| * @param k |
| * @param v |
| */ |
| public void setListField(String k, List<String> v) { |
| listFields.put(k, v); |
| } |
| |
| /** |
| * Get a single String field |
| * @param k |
| * @return String field |
| */ |
| public String getSimpleField(String k) { |
| return simpleFields.get(k); |
| } |
| |
| /** |
| * Get a single Map field |
| * @param k |
| * @return String --> String map |
| */ |
| public Map<String, String> getMapField(String k) { |
| return mapFields.get(k); |
| } |
| |
| /** |
| * Get a single List field |
| * @param k |
| * @return String list |
| */ |
| public List<String> getListField(String k) { |
| return listFields.get(k); |
| } |
| |
| /** |
| * Set a single simple int field |
| * @param k |
| * @param v |
| */ |
| public void setIntField(String k, int v) { |
| setSimpleField(k, Integer.toString(v)); |
| } |
| |
| /** |
| * Get a single int field |
| * @param k |
| * @param defaultValue |
| * @return the int value, or defaultValue if not present |
| */ |
| public int getIntField(String k, int defaultValue) { |
| int v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| try { |
| v = Integer.parseInt(valueStr); |
| } catch (NumberFormatException e) { |
| _logger.warn("", e); |
| } |
| } |
| return v; |
| } |
| |
| /** |
| * Set a single simple long field |
| * @param k |
| * @param v |
| */ |
| public void setLongField(String k, long v) { |
| setSimpleField(k, Long.toString(v)); |
| } |
| |
| /** |
| * Get a single long field |
| * @param k |
| * @param defaultValue |
| * @return the long value, or defaultValue if not present |
| */ |
| public long getLongField(String k, long defaultValue) { |
| long v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| try { |
| v = Long.parseLong(valueStr); |
| } catch (NumberFormatException e) { |
| _logger.warn("", e); |
| } |
| } |
| return v; |
| } |
| |
| /** |
| * Set a single simple double field |
| * @param k |
| * @param v |
| */ |
| public void setDoubleField(String k, double v) { |
| setSimpleField(k, Double.toString(v)); |
| } |
| |
| /** |
| * Get a single double field |
| * @param k |
| * @param defaultValue |
| * @return the double value, or defaultValue if not present |
| */ |
| public double getDoubleField(String k, double defaultValue) { |
| double v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| try { |
| v = Double.parseDouble(valueStr); |
| } catch (NumberFormatException e) { |
| _logger.warn("", e); |
| } |
| } |
| return v; |
| } |
| |
| /** |
| * Set a single simple boolean field |
| * @param k |
| * @param v |
| */ |
| public void setBooleanField(String k, boolean v) { |
| setSimpleField(k, Boolean.toString(v)); |
| } |
| |
| /** |
| * Get a single boolean field |
| * @param k |
| * @param defaultValue |
| * @return the boolean field, or defaultValue if not present |
| */ |
| public boolean getBooleanField(String k, boolean defaultValue) { |
| boolean v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| // Boolean.parseBoolean() doesn't throw an exception if the string isn't a valid boolean. |
| // Thus, a direct comparison is necessary to make sure the value is actually "true" or |
| // "false" |
| if (valueStr.equalsIgnoreCase(Boolean.TRUE.toString())) { |
| v = true; |
| } else if (valueStr.equalsIgnoreCase(Boolean.FALSE.toString())) { |
| v = false; |
| } |
| } |
| return v; |
| } |
| |
| /** |
| * Set a single simple Enum field |
| * @param k |
| * @param v |
| */ |
| public <T extends Enum<T>> void setEnumField(String k, T v) { |
| setSimpleField(k, v.toString()); |
| } |
| |
| /** |
| * Get a single Enum field |
| * @param k |
| * @param enumType |
| * @param defaultValue |
| * @return the Enum field of enumType, or defaultValue if not present |
| */ |
| public <T extends Enum<T>> T getEnumField(String k, Class<T> enumType, T defaultValue) { |
| T v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| try { |
| v = Enum.valueOf(enumType, valueStr); |
| } catch (NullPointerException e) { |
| _logger.warn("", e); |
| } catch (IllegalArgumentException e) { |
| _logger.warn("", e); |
| } |
| } |
| return v; |
| } |
| |
| /** |
| * Get a single String field |
| * @param k |
| * @param defaultValue |
| * @return the String value, or defaultValue if not present |
| */ |
| public String getStringField(String k, String defaultValue) { |
| String v = defaultValue; |
| String valueStr = getSimpleField(k); |
| if (valueStr != null) { |
| v = valueStr; |
| } |
| return v; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| sb.append(id + ", "); |
| if (simpleFields != null) { |
| sb.append(simpleFields); |
| } |
| if (mapFields != null) { |
| sb.append(mapFields); |
| } |
| if (listFields != null) { |
| sb.append(listFields); |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * merge functionality is used to merge multiple znrecord into a single one. |
| * This will make use of the id of each ZNRecord and append it to every key |
| * thus making key unique. This is needed to optimize on the watches. |
| * @param record |
| */ |
| public void merge(ZNRecord record) { |
| if (record == null) { |
| return; |
| } |
| |
| if (record.getDeltaList().size() > 0) { |
| _logger.info("Merging with delta list, recordId = " + id + " other:" + record.getId()); |
| merge(record.getDeltaList()); |
| return; |
| } |
| simpleFields.putAll(record.simpleFields); |
| for (String key : record.mapFields.keySet()) { |
| Map<String, String> map = mapFields.get(key); |
| if (map != null) { |
| map.putAll(record.mapFields.get(key)); |
| } else { |
| mapFields.put(key, record.mapFields.get(key)); |
| } |
| } |
| for (String key : record.listFields.keySet()) { |
| List<String> list = listFields.get(key); |
| if (list != null) { |
| list.addAll(record.listFields.get(key)); |
| } else { |
| listFields.put(key, record.listFields.get(key)); |
| } |
| } |
| } |
| |
| /** |
| * Replace functionality is used to update this ZNRecord with the given ZNRecord. The value of a |
| * field in this record will be replaced with the value of the same field in given record if it |
| * presents. If there is new field in given ZNRecord but not in this record, add that field into |
| * this record. |
| * The list fields and map fields will be replaced as a single entry. |
| * |
| * @param record |
| */ |
| public void update(ZNRecord record) { |
| if (record != null) { |
| simpleFields.putAll(record.simpleFields); |
| listFields.putAll(record.listFields); |
| mapFields.putAll(record.mapFields); |
| } |
| } |
| |
| /** |
| * Merge in a {@link ZNRecordDelta} corresponding to its merge policy |
| * @param delta |
| */ |
| void merge(ZNRecordDelta delta) { |
| if (delta.getMergeOperation() == MergeOperation.ADD) { |
| merge(delta.getRecord()); |
| } else if (delta.getMergeOperation() == MergeOperation.SUBTRACT) { |
| subtract(delta.getRecord()); |
| } else if (delta.getMergeOperation() == MergeOperation.UPDATE) { |
| update(delta.getRecord()); |
| } |
| } |
| |
| /** |
| * Batch merge of {@link ZNRecordDelta} |
| * @see #merge(ZNRecordDelta) |
| * @param deltaList |
| */ |
| void merge(List<ZNRecordDelta> deltaList) { |
| for (ZNRecordDelta delta : deltaList) { |
| merge(delta); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (!(obj instanceof ZNRecord)) { |
| return false; |
| } |
| ZNRecord that = (ZNRecord) obj; |
| if (this.getSimpleFields().size() != that.getSimpleFields().size()) { |
| return false; |
| } |
| if (this.getMapFields().size() != that.getMapFields().size()) { |
| return false; |
| } |
| if (this.getListFields().size() != that.getListFields().size()) { |
| return false; |
| } |
| if (!this.getSimpleFields().equals(that.getSimpleFields())) { |
| return false; |
| } |
| if (!this.getMapFields().equals(that.getMapFields())) { |
| return false; |
| } |
| if (!this.getListFields().equals(that.getListFields())) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Subtract value from this ZNRecord |
| * Note: does not support subtract in each list in list fields or map in |
| * mapFields |
| * @param value |
| */ |
| public void subtract(ZNRecord value) { |
| for (String key : value.getSimpleFields().keySet()) { |
| simpleFields.remove(key); |
| } |
| |
| for (String key : value.getListFields().keySet()) { |
| listFields.remove(key); |
| } |
| |
| for (String key : value.getMapFields().keySet()) { |
| Map<String, String> map = value.getMapField(key); |
| if (map == null) { |
| mapFields.remove(key); |
| } else { |
| Map<String, String> nestedMap = mapFields.get(key); |
| if (nestedMap != null) { |
| for (String mapKey : map.keySet()) { |
| nestedMap.remove(mapKey); |
| } |
| if (nestedMap.size() == 0) { |
| mapFields.remove(key); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Get the version of this record |
| * @return version number |
| */ |
| @JsonIgnore(true) |
| public int getVersion() { |
| return _version; |
| } |
| |
| /** |
| * Set the version of this record |
| * @param version |
| */ |
| @JsonIgnore(true) |
| public void setVersion(int version) { |
| _version = version; |
| } |
| |
| /** |
| * Get the time that this record was created |
| * @return UNIX timestamp |
| */ |
| @JsonIgnore(true) |
| public long getCreationTime() { |
| return _creationTime; |
| } |
| |
| /** |
| * Set the time that this record was created |
| * @param creationTime |
| */ |
| @JsonIgnore(true) |
| public void setCreationTime(long creationTime) { |
| _creationTime = creationTime; |
| } |
| |
| /** |
| * Get the time that this record was last modified |
| * @return UNIX timestamp |
| */ |
| @JsonIgnore(true) |
| public long getModifiedTime() { |
| return _modifiedTime; |
| } |
| |
| /** |
| * Set the time that this record was last modified |
| * @param modifiedTime |
| */ |
| @JsonIgnore(true) |
| public void setModifiedTime(long modifiedTime) { |
| _modifiedTime = modifiedTime; |
| } |
| |
| /** |
| * Get the session Id of ephemeral node owner |
| */ |
| @JsonIgnore(true) |
| public long getEphemeralOwner() { |
| return _ephemeralOwner; |
| } |
| |
| /** |
| * Set the session Id of ephemeral node owner |
| * @param ephemeralOwner |
| */ |
| @JsonIgnore(true) |
| public void setEphemeralOwner(long ephemeralOwner) { |
| _ephemeralOwner = ephemeralOwner; |
| } |
| } |