blob: 1d7b7b100da34b61bc11b6508a3cf11f53b3504b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.api;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import com.google.common.base.Throwables;
import static com.datatorrent.api.StreamingApplication.APEX_PREFIX;
/**
* Attribute represents the attribute which can be set on various components in the system.
*
* @param <T> type of the value which can be stored against the attribute.
* @since 2.0.0
*/
public class Attribute<T> implements Serializable
{
public final T defaultValue;
public final String name;
public final StringCodec<T> codec;
public Attribute(StringCodec<T> codec)
{
this(null, null, codec);
}
public Attribute(T defaultValue)
{
this(null, defaultValue, null);
}
public Attribute(T defaultValue, StringCodec<T> codec)
{
this(null, defaultValue, codec);
}
private Attribute(String name, T defaultValue, StringCodec<T> codec)
{
this.name = name;
this.defaultValue = defaultValue;
this.codec = codec;
}
@Override
public int hashCode()
{
return name.hashCode();
}
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings(value = "unchecked")
final Attribute<T> other = (Attribute<T>)obj;
return !((this.name == null) ? (other.name != null) : !this.name.equals(other.name));
}
public String getName()
{
return "attr" + name.substring(name.lastIndexOf('.'));
}
public String getLongName()
{
return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase();
}
public String getSimpleName()
{
return name.substring(name.lastIndexOf('.') + 1);
}
@Override
public String toString()
{
return "Attribute{" + "defaultValue=" + defaultValue + ", name=" + name + ", codec=" + codec + '}';
}
private static final long serialVersionUID = 201310111904L;
/**
* Parameterized and scoped context attribute map that supports serialization.
* Derived from io.netty.util.AttributeMap
*
* @since 0.3.2
*/
public interface AttributeMap extends Cloneable
{
/**
* Return the attribute value for the given key. If the map does not have an
* entry for the key, null is returned.
*
* @param <T>
* @param key
* @return AttributeValue
*/
<T> T get(Attribute<T> key);
/**
* Return true if value for an attribute is set. It returns false if the
* value for the attribute was never set.
*
* @param key attribute key
* @return true if the value for the attribute was set, false otherwise.
*/
boolean contains(Attribute<?> key);
/**
* Assign value for a particular attributes.
*
* @param <T> Type of the value
* @param key Attribute which is being assigned the value
* @param value Value which is being assigned.
* @return Previous value against the attribute or null if it was not assigned.
*/
<T> T put(Attribute<T> key, T value);
Set<Map.Entry<Attribute<?>, Object>> entrySet();
/**
* Clone the current map.
* @return a shallow copy of this AtrributeMap.
* @throws java.lang.CloneNotSupportedException
*/
AttributeMap clone() throws CloneNotSupportedException;
/**
* DefaultAttributeMap is the default implementation of AttributeMap. It's backed by a map internally.
*/
class DefaultAttributeMap implements AttributeMap, Serializable
{
private HashMap<Attribute<?>, Object> map;
public DefaultAttributeMap()
{
this(new HashMap<Attribute<?>, Object>());
}
private DefaultAttributeMap(HashMap<Attribute<?>, Object> map)
{
super();
this.map = map;
}
@Override
@SuppressWarnings(value = "unchecked")
public <T> T get(Attribute<T> key)
{
return (T)map.get(key);
}
@Override
public String toString()
{
return this.map.toString();
}
@Override
@SuppressWarnings("unchecked")
public DefaultAttributeMap clone() throws CloneNotSupportedException
{
DefaultAttributeMap clone = (DefaultAttributeMap)super.clone();
clone.map = (HashMap<Attribute<?>, Object>)map.clone();
return clone;
}
@Override
@SuppressWarnings(value = "unchecked")
public <T> T put(Attribute<T> key, T value)
{
return (T)map.put(key, value);
}
@Override
public Set<Entry<Attribute<?>, Object>> entrySet()
{
return map.entrySet();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof DefaultAttributeMap)) {
return false;
}
DefaultAttributeMap that = (DefaultAttributeMap)o;
return !(map != null ? !map.equals(that.map) : that.map != null);
}
@Override
public int hashCode()
{
return map != null ? map.hashCode() : 0;
}
@Override
public boolean contains(Attribute<?> key)
{
return map.containsKey(key);
}
private static final long serialVersionUID = 201306051022L;
}
/**
* This class inspects and initializes the attributes with their field names so that they can be used
* from properties files.
*
* Engine uses it internally to initialize the Interfaces that may have Attributes defined in them.
*/
class AttributeInitializer
{
static final HashMap<Class<?>, Set<Attribute<Object>>> map = new HashMap<>();
public static Map<Attribute<Object>, Object> getAllAttributes(Context context, Class<?> clazz)
{
Map<Attribute<Object>, Object> result = new HashMap<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
@SuppressWarnings(value = "unchecked")
Attribute<Object> attribute = (Attribute<Object>)f.get(null);
result.put(attribute, context.getValue(attribute));
}
}
} catch (Exception ex) {
throw Throwables.propagate(ex);
}
return result;
}
public static Set<Attribute<Object>> getAttributes(Class<?> clazz)
{
return map.get(clazz);
}
/**
* Initialize the static attributes defined in the class.
*
* @param clazz class whose static attributes need to be initialized.
* @return 0 if the clazz was already initialized, identity hash code of the clazz otherwise.
*/
/* both for Enum2String */
@SuppressWarnings(value = {"unchecked", "rawtypes"})
public static long initialize(final Class<?> clazz)
{
if (map.containsKey(clazz)) {
return 0;
}
Set<Attribute<Object>> set = new HashSet<>();
try {
for (Field f: clazz.getDeclaredFields()) {
if (Modifier.isStatic(f.getModifiers()) && Attribute.class.isAssignableFrom(f.getType())) {
Attribute<Object> attribute = (Attribute<Object>)f.get(null);
if (attribute.name == null) {
Field nameField = Attribute.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(attribute, clazz.getCanonicalName() + '.' + f.getName());
nameField.setAccessible(false);
}
/* Handle trivial cases here even though this may spoil API users. */
if (attribute.codec == null) {
StringCodec<?> codec = null;
if (attribute.defaultValue != null) {
Class<?> klass = attribute.defaultValue.getClass();
codec = StringCodec.Factory.getInstance(klass);
}
if (codec != null) {
Field codecField = Attribute.class.getDeclaredField("codec");
codecField.setAccessible(true);
codecField.set(attribute, codec);
codecField.setAccessible(false);
}
}
set.add(attribute);
}
}
} catch (Exception ex) {
throw Throwables.propagate(ex);
}
map.put(clazz, set);
return (long)clazz.getModifiers() << 32 | clazz.hashCode();
}
}
}
}