blob: 85f18223e1b0dad442fca00c6d9442310fe3968b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.dimensions.aggregator;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* <p>
* This registry is used by generic dimensions computation operators and dimension stores in order to support
* plugging different
* aggregators into the operator. Subclasses of
* {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry
* to support pluggable aggregators when doing dimensions computation, and Subclasses of
* AppDataSingleSchemaDimensionStoreHDHT use this class as well.
* </p>
* <p>
* The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators,
* and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly
* process schemas, App Data queries, and store aggregated data.
* </p>
*
* @since 3.1.0
*/
public class AggregatorRegistry implements Serializable
{
private static final long serialVersionUID = 20154301642L;
/**
* This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the
* default {@link AggregatorRegistry}.
*/
private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR;
/**
* This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default
* {@link AggregatorRegistry}.
*/
private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR;
//Build the default maps
static {
DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR);
DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR);
}
/**
* This is a default aggregator registry that can be used in operators.
*/
public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry(
DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
AggregatorIncrementalType.NAME_TO_ORDINAL);
/**
* create an new instance of AggregatorRegistry instead of of share same one in case one application has multiple
* schema
* @return new created AggregatorRegistry instance;
*/
public static final AggregatorRegistry newDefaultAggregatorRegistry()
{
AggregatorRegistry aggregatorRegistry = new AggregatorRegistry(
DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR,
AggregatorIncrementalType.NAME_TO_ORDINAL);
aggregatorRegistry.setup();
return aggregatorRegistry;
}
/**
* This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not.
*/
private transient boolean setup = false;
/**
* This is a map from the class of an {@link IncrementalAggregator} to the name of that
* {@link IncrementalAggregator}.
*/
private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName;
/**
* This is a map from the name of an {@link OTFAggregator} to the list of the names of all
* {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}.
*/
private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators;
/**
* This is a map from the aggregator ID of an
* {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}.
*/
private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator;
protected transient Map<Integer, AbstractTopBottomAggregator> topBottomAggregatorIDToAggregator;
/**
* This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}.
*/
private Map<String, IncrementalAggregator> nameToIncrementalAggregator;
/**
* This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}.
*/
private Map<String, OTFAggregator> nameToOTFAggregator;
/**
* the map from TOPN and BOTTOM aggregator to name
*/
private Map<String, AbstractTopBottomAggregator> nameToTopBottomAggregator = Maps.newHashMap();
/**
* This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}.
*/
private Map<String, Integer> incrementalAggregatorNameToID;
protected Map<String, Integer> topBottomAggregatorNameToID = Maps.newHashMap();
protected static Set<String> topBottomAggregatorNames;
/**
* This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator}
*
* @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the
* {@link IncrementalAggregator}.
* @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that
* {@link IncrementalAggregator}.
*/
private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator)
{
Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap();
for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) {
staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName()));
}
return staticAggregatorNameToID;
}
/**
* This is a helper method for computing the hash of the string. This is intended to be a static unchanging
* method since the computed hash is used for aggregator IDs which are used for persistence.
* <p>
* <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores.
* </p>
*
* @return The hash of the given string.
*/
private static int stringHash(String string)
{
int hash = 5381;
for (int index = 0;
index < string.length();
index++) {
int character = (int)string.charAt(index);
hash = hash * 33 + character;
}
return hash;
}
/**
* This constructor is present for Kryo serialization
*/
private AggregatorRegistry()
{
//for kryo
}
/**
* <p>
* This creates an {@link AggregatorRegistry} which assigns the given names to the given
* {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates
* the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the
* fully qualified class name of each {@link IncrementalAggregator}.
* </p>
* <p>
* <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
* only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
* </p>
*
* @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
* where the string is the name of an
* {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator}
* with that name.
* @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the string
* is the name of
* an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that
* name.
*/
public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
Map<String, OTFAggregator> nameToOTFAggregator)
{
this(nameToIncrementalAggregator,
nameToOTFAggregator,
autoGenIds(nameToIncrementalAggregator));
}
/**
* <p>
* This creates an {@link AggregatorRegistry} which assigns the given names to the given
* {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each
* {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs.
* </p>
* <p>
* <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the
* only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored.
* </p>
*
* @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator},
* where the string is the name of an
* {@link IncrementalAggregator} and the value is the
* {@link IncrementalAggregator}
* with that name.
* @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the
* string is the name of
* an {@link OTFAggregator} and the value is the {@link OTFAggregator} with
* that name.
* @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID
* for that
* {@link IncrementalAggregator}.
*/
public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator,
Map<String, OTFAggregator> nameToOTFAggregator,
Map<String, Integer> incrementalAggregatorNameToID)
{
setNameToIncrementalAggregator(nameToIncrementalAggregator);
setNameToOTFAggregator(nameToOTFAggregator);
setIncrementalAggregatorNameToID(incrementalAggregatorNameToID);
validate();
}
/**
* This is a helper method which is used to do validation on the maps provided to the constructor of this class.
*/
private void validate()
{
for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
for (Map.Entry<String, AbstractTopBottomAggregator> entry : nameToTopBottomAggregator.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
}
/**
* This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}.
* This method should be called before the {@link AggregatorRegistry} is used.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void setup()
{
if (setup) {
//If the AggregatorRegistry was already setup. Don't set it up again.
return;
}
setup = true;
classToIncrementalAggregatorName = Maps.newHashMap();
for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) {
classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey());
}
incrementalAggregatorIDToAggregator = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
String aggregatorName = entry.getKey();
int aggregatorID = entry.getValue();
incrementalAggregatorIDToAggregator.put(aggregatorID,
nameToIncrementalAggregator.get(aggregatorName));
}
otfAggregatorToIncrementalAggregators = Maps.newHashMap();
for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) {
String name = entry.getKey();
List<String> staticAggregators = Lists.newArrayList();
OTFAggregator dotfAggregator = nameToOTFAggregator.get(name);
for (Class clazz : dotfAggregator.getChildAggregators()) {
staticAggregators.add(classToIncrementalAggregatorName.get(clazz));
}
otfAggregatorToIncrementalAggregators.put(name, staticAggregators);
}
}
public void buildTopBottomAggregatorIDToAggregator()
{
topBottomAggregatorIDToAggregator = Maps.newHashMap();
for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet()) {
String aggregatorName = entry.getKey();
int aggregatorID = entry.getValue();
topBottomAggregatorIDToAggregator.put(aggregatorID,
nameToTopBottomAggregator.get(aggregatorName));
}
}
/**
* This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name
* to an {@link IncrementalAggregator}.
*
* @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an
* {@link IncrementalAggregator}.
*/
private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator)
{
this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator));
}
/**
* This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to
* an {@link OTFAggregator}.
*
* @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}.
*/
private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator)
{
this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator));
}
/**
* Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator}
* registered to this registry.
*
* @param aggregatorName The aggregator name to check.
* @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to
* this registry. False otherwise.
*/
public boolean isAggregator(String aggregatorName)
{
if ( classToIncrementalAggregatorName.values().contains(aggregatorName) ||
nameToOTFAggregator.containsKey(aggregatorName)) {
return true;
}
//the composite probably send whole aggregator name
String aggregatorType = aggregatorName.split("-")[0];
return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
}
/**
* Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered
* to this registry.
*
* @param aggregatorName The aggregator name to check.
* @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered
* to this registry. False otherwise.
*/
public boolean isIncrementalAggregator(String aggregatorName)
{
return classToIncrementalAggregatorName.values().contains(aggregatorName);
}
public boolean isOTFAggregator(String aggregatorName)
{
return nameToOTFAggregator.containsKey(aggregatorName);
}
public boolean isTopBottomAggregatorType(String aggregatorType)
{
return (AggregatorTopBottomType.valueOf(aggregatorType) != null);
}
/**
* Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
*
* @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}.
*/
public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName()
{
return classToIncrementalAggregatorName;
}
/**
* Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
*
* @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}.
*/
public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator()
{
return incrementalAggregatorIDToAggregator;
}
public Map<Integer, AbstractTopBottomAggregator> getTopBottomAggregatorIDToAggregator()
{
return topBottomAggregatorIDToAggregator;
}
/**
* This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to
* {@link IncrementalAggregator} ID.
*
* @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to
* {@link IncrementalAggregator} ID.
*/
private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID)
{
Preconditions.checkNotNull(incrementalAggregatorNameToID);
for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) {
Preconditions.checkNotNull(entry.getKey());
Preconditions.checkNotNull(entry.getValue());
}
this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID);
}
/**
* This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the
* {@link IncrementalAggregator}.
*
* @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the
* {@link IncrementalAggregator}.
*/
public Map<String, Integer> getIncrementalAggregatorNameToID()
{
return incrementalAggregatorNameToID;
}
public Map<String, Integer> getTopBottomAggregatorNameToID()
{
return topBottomAggregatorNameToID;
}
/**
* Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
*
* @return The name to {@link OTFAggregator} mapping.
*/
public Map<String, OTFAggregator> getNameToOTFAggregators()
{
return nameToOTFAggregator;
}
public Map<String, AbstractTopBottomAggregator> getNameToTopBottomAggregator()
{
return nameToTopBottomAggregator;
}
/**
* Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
* that {@link OTFAggregator}.
*
* @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of
* that {@link OTFAggregator}.
*/
public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators()
{
return otfAggregatorToIncrementalAggregators;
}
/**
* Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}.
*
* @return The name to {@link IncrementalAggregator} mapping.
*/
public Map<String, IncrementalAggregator> getNameToIncrementalAggregator()
{
return nameToIncrementalAggregator;
}
private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class);
}