blob: ac1c4c06a9c7f9ab46319210f96160d50d3c64b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.appdata.schemas;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.appdata.gpo.Serde;
import org.apache.apex.malhar.lib.appdata.gpo.SerdeObjectPayloadFix;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.objects.Object2IntLinkedOpenHashMap;
/**
* This class manages the storage of fields in app data. It is used in {@link GPOMutable} objects
* to map field names to values in order to respond to queries, it also serves as a schema which is
* used in the serialization of {@link GPOMutable} objects in order to ensure consistent serialization
* and deserialization of data.
* @since 3.0.0
*/
public class FieldsDescriptor implements Serializable
{
private static final long serialVersionUID = 201506251228L;
/**
* This is a list of all of the fields managed by this {@link FieldsDescriptor} object.
*/
private List<String> fieldList;
/**
* This is a list of all of the types managed by this {@link FieldsDescriptor} object.
*/
private List<Type> typesList;
/**
* This is a set of all the fields managed by this {@link FieldsDescriptor} object.
*/
private Fields fields;
/**
* This is a set of all of the types managed by this {@link FieldsDescriptor} object.
*/
private Set<Type> types;
private Map<Type, Object2IntLinkedOpenHashMap<String>> typeToFieldToIndex;
/**
* A Map from the type to the list of fields corresponding to that type. The list of fields
* corresponding to a type also represents the order in which the fields are stored in
* both a {@link GPOMutable} object and the order of fields when {@link GPOMutable} objects
* are serialized.
*/
private Map<Type, List<String>> typeToFields;
/**
* This is the field to type mapping for this {@link FieldsDesciptor} object.
*/
private Map<String, Type> fieldToType;
/**
* This is the set of all the compressed types for this {@link FieldsDescriptor} object.
*/
private Set<Type> compressedTypes;
/**
* This is a map from the type of a field to the number of values to store for fields of
* that type. Typically the number of values to store for a type is the same as the number
* of fields of that type, but if type compression is enabled for that field then the number
* of values stored for that type is 1.
*/
private Object2IntLinkedOpenHashMap<Type> typeToSize;
private Serde[] serdes;
private SerdeObjectPayloadFix serdePayloadFix;
/**
* This constructor is used for serialization.
*/
@SuppressWarnings("unused")
private FieldsDescriptor()
{
//For kryo
}
/**
* This creates a {@link FieldsDescriptor} with the given field to type map.
* @param fieldToType A mapping from field names to the type of the field.
*/
public FieldsDescriptor(Map<String, Type> fieldToType)
{
setFieldToType(fieldToType);
compressedTypes = Sets.newHashSet();
initialize();
}
/**
* This creates a {@link FieldsDescriptor} with the given field to type map.
* @param fieldToType A mapping from field names to the type of the field.
* @param fieldToSerdeObject A mapping from field names to the corresponding serde object.
*/
public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject)
{
setFieldToType(fieldToType);
compressedTypes = Sets.newHashSet();
initialize();
List<String> fieldNames = typeToFields.get(Type.OBJECT);
if (fieldNames == null) {
throw new IllegalArgumentException("There are no fields of type " + Type.OBJECT + " in this fieldsdescriptor");
} else {
serdes = new Serde[fieldNames.size()];
//Insert serdes in corresponding order
for (int index = 0; index < fieldNames.size(); index++) {
String fieldName = fieldNames.get(index);
Serde serdeObject = fieldToSerdeObject.get(fieldName);
if (serdeObject == null) {
throw new IllegalArgumentException("The field " + fieldName + " doesn't have a serde object.");
}
serdes[index] = serdeObject;
}
}
}
public FieldsDescriptor(Map<String, Type> fieldToType, Map<String, Serde> fieldToSerdeObject,
SerdeObjectPayloadFix serdePayloadFix)
{
this(fieldToType, fieldToSerdeObject);
this.serdePayloadFix = serdePayloadFix;
}
/**
* This creates a field descriptor with the given field to type map.
* <br/><br/>
* <b>Note:</b> Compressed types are currently ignored and will be implemented in the future.
* @param fieldToType A mapping from field names to the type of the field.
* @param compressedTypes The types, which will have compressed values.
*/
public FieldsDescriptor(Map<String, Type> fieldToType, Set<Type> compressedTypes)
{
setFieldToType(fieldToType);
setCompressedTypes(compressedTypes);
initialize();
}
/**
* This is a helper method, which initializes all the data structures for a
* {@link FieldsDescriptor} object.
*/
private void initialize()
{
//TODO make this maps immutable
typeToFieldToIndex = Maps.newHashMap();
typeToFields = Maps.newHashMap();
for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
String field = entry.getKey();
Type type = entry.getValue();
List<String> fieldsList = typeToFields.get(type);
if (fieldsList == null) {
fieldsList = Lists.newArrayList();
typeToFields.put(type, fieldsList);
}
fieldsList.add(field);
}
//ensure consistent ordering of fields
for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
Type type = entry.getKey();
List<String> tempFields = entry.getValue();
Collections.sort(tempFields);
Object2IntLinkedOpenHashMap<String> fieldToIndex = new Object2IntLinkedOpenHashMap<>();
for (int index = 0; index < tempFields.size(); index++) {
String field = tempFields.get(index);
if (compressedTypes.contains(type)) {
fieldToIndex.put(field, 0);
} else {
fieldToIndex.put(field, index);
}
}
typeToFieldToIndex.put(type, fieldToIndex);
}
//Types
if (!typeToFields.isEmpty()) {
types = EnumSet.copyOf(typeToFields.keySet());
} else {
types = Sets.newHashSet();
}
//Types list
typesList = Lists.newArrayList();
typesList.addAll(types);
//Field List
fieldList = Lists.newArrayList();
fieldList.addAll(fieldToType.keySet());
((ArrayList<String>)fieldList).trimToSize();
Collections.sort(fieldList);
//Array Sizes
typeToSize = new Object2IntLinkedOpenHashMap<>();
for (Map.Entry<Type, List<String>> entry : typeToFields.entrySet()) {
Type type = entry.getKey();
if (compressedTypes.contains(type)) {
getTypeToSize().put(type, 1);
} else {
getTypeToSize().put(type, entry.getValue().size());
}
}
}
/**
* Gets the type to field to index map.
* @return The type to field to index map.
*/
public Map<Type, Object2IntLinkedOpenHashMap<String>> getTypeToFieldToIndex()
{
return typeToFieldToIndex;
}
/**
* This is a helper method which sets and validates the field to type
* map.
* @param fieldToType The field to type map to set for this {@link FieldsDescriptor}
* object.
*/
private void setFieldToType(Map<String, Type> fieldToType)
{
for (Map.Entry<String, Type> entry : fieldToType.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
this.fieldToType = Maps.newLinkedHashMap(fieldToType);
}
/**
* This is a helper method to set and validate compressed types.
* @param compressedTypes The compressed types to set.
*/
private void setCompressedTypes(Set<Type> compressedTypes)
{
for (Type type : compressedTypes) {
Preconditions.checkNotNull(type);
}
this.compressedTypes = Sets.newHashSet(compressedTypes);
}
/**
* Gets the field to type map for this {@link FieldsDescriptor} object.
* @return The type map for this {@link FieldsDescriptor} object.
*/
public Map<String, Type> getFieldToType()
{
return fieldToType;
}
/**
* The type of the given field.
* @param field The field whose type needs to be looked up.
* @return The type of the given field.
*/
public Type getType(String field)
{
return fieldToType.get(field);
}
/**
* Gets the set of fields managed by this {@link FieldsDescriptor} object.
* @return The set of fields managed by this {@link FieldsDescriptor object.
*/
public Fields getFields()
{
if (fields == null) {
fields = new Fields(fieldToType.keySet());
}
return fields;
}
/**
* This method creates a new {@link FieldsDescriptor} object which only includes
* the given set of fields.
* @param fields The fields to include in the new {@link FieldsDescriptor}.
* @return The fields to include in the new {@link FieldsDescriptor}.
*/
public FieldsDescriptor getSubset(Fields fields)
{
Map<String, Type> newFieldToType = Maps.newHashMap();
for (String field : fields.getFields()) {
Type type = fieldToType.get(field);
newFieldToType.put(field, type);
}
return new FieldsDescriptor(newFieldToType);
}
/**
* Returns a map from type to a list of all the fields with that type.
* @return A map from type to a list of all the fields with that type.
*/
public Map<Type, List<String>> getTypeToFields()
{
return typeToFields;
}
/**
* Returns set of types of all the fields managed by this {@link FieldsDescriptor} object.
* @return The set of types of all the fields managed by this {@link FieldsDescriptor} object.
*/
public Set<Type> getTypes()
{
return types;
}
/**
* Returns the list of types of all the fields managed by this {@link FieldsDescriptor} object.
* The content of this list will be the same as the set, this method is provided because iterating
* over lists is faster than iterating over sets.
* @return The list of types of all the fields managed by this {@link FieldsDescriptor} object.
*/
public List<Type> getTypesList()
{
return typesList;
}
/**
* Returns a list of all of the fields represented by the {@link FieldsDescriptor} object.
* This method is provided because it is faster to iterate over lists.
* @return the fieldList
*/
public List<String> getFieldList()
{
return fieldList;
}
/**
* Gets the types whose corresponding fields will be compressed to share the same value.
* @return The types whose corresponding fields will be compressed to share the same value.
*/
public Set<Type> getCompressedTypes()
{
return compressedTypes;
}
/**
* Gets the mapping from the type of a field to the number of values to store for that type.
* The number of of values to store for a type is usually the number of fields of that type,
* but if there is type compression enabled for a type, the number of values to store for
* that type will be 1.
* @return A map from a type to the number of values to store for that type.
*/
public Object2IntLinkedOpenHashMap<Type> getTypeToSize()
{
return typeToSize;
}
public Serde[] getSerdes()
{
return serdes;
}
public SerdeObjectPayloadFix getSerdePayloadFix()
{
return this.serdePayloadFix;
}
@Override
public int hashCode()
{
int hash = 7;
hash = 53 * hash + (this.fieldToType != null ? this.fieldToType.hashCode() : 0);
hash = 53 * hash + (this.compressedTypes != null ? this.compressedTypes.hashCode() : 0);
return hash;
}
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final FieldsDescriptor other = (FieldsDescriptor)obj;
if (this.fieldToType != other.fieldToType && (this.fieldToType == null || !this.fieldToType.equals(
other.fieldToType))) {
return false;
}
if (this.compressedTypes != other.compressedTypes && (this.compressedTypes == null || !this.compressedTypes.equals(
other.compressedTypes))) {
return false;
}
return true;
}
@Override
public String toString()
{
return "FieldsDescriptor{" + "fieldToType=" + fieldToType + ", compressedTypes=" + compressedTypes + '}';
}
private static final Logger LOG = LoggerFactory.getLogger(FieldsDescriptor.class);
}