blob: 405dd8b103b5a4b6e814561a5f39d10ae5950db5 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.client.stellar;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
import static org.apache.metron.profiler.client.stellar.Util.getArg;
import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.client.HBaseProfilerClient;
import org.apache.metron.profiler.client.ProfilerClient;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Stellar function that can retrieve data contained within a Profile.
*
* PROFILE_GET
*
* Retrieve all values for 'entity1' from 'profile1' over the past 4 hours.
*
* <code>PROFILE_GET('profile1', 'entity1', 4, 'HOURS')</code>
*
* Retrieve all values for 'entity1' from 'profile1' over the past 2 days.
*
* <code>PROFILE_GET('profile1', 'entity1', 2, 'DAYS')</code>
*
* Retrieve all values for 'entity1' from 'profile1' that occurred on 'weekdays' over the past month.
*
* <code>PROFILE_GET('profile1', 'entity1', 1, 'MONTHS', ['weekdays'])</code>
*
* Retrieve all values for 'entity1' from 'profile1' over the past 2 days, with no 'groupBy',
* and overriding the usual global client configuration parameters for window duration.
*
* <code>PROFILE_GET('profile1', 'entity1', 2, 'DAYS', [], {'profiler.client.period.duration' : '2', 'profiler.client.period.duration.units' : 'MINUTES'})</code>
*
* Retrieve all values for 'entity1' from 'profile1' that occurred on 'weekdays' over the past month,
* overriding the usual global client configuration parameters for window duration.
*
* <code>PROFILE_GET('profile1', 'entity1', 1, 'MONTHS', ['weekdays'], {'profiler.client.period.duration' : '2', 'profiler.client.period.duration.units' : 'MINUTES'})</code>
*
*/
@Stellar(
namespace="PROFILE",
name="GET",
description="Retrieves a series of values from a stored profile.",
params={
"profile - The name of the profile.",
"entity - The name of the entity.",
"periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
"groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
"list used during profile creation. Defaults to an empty list, meaning no groups.",
"config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
"of the same name. Default is the empty Map, meaning no overrides."
},
returns="The selected profile measurements."
)
public class GetProfile implements StellarFunction {
/**
* Cached client that can retrieve profile values.
*/
private ProfilerClient client;
/**
* Cached value of config map actually used to construct the previously cached client.
*/
private Map<String, Object> cachedConfigMap = new HashMap<String, Object>(6);
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* Initialization. No longer need to do anything in initialization,
* as all setup is done lazily and cached.
*/
@Override
public void initialize(Context context) {
}
/**
* Is the function initialized?
*/
@Override
public boolean isInitialized() {
return true;
}
/**
* Apply the function.
* @param args The function arguments.
* @param context
*/
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
String profile = getArg(0, String.class, args);
String entity = getArg(1, String.class, args);
Optional<List<ProfilePeriod>> periods = Optional.ofNullable(getArg(2, List.class, args));
//Optional arguments
@SuppressWarnings("unchecked")
List<Object> groups = null;
Map configOverridesMap = null;
if (args.size() < 4) {
// no optional args, so default 'groups' and configOverridesMap remains null.
groups = new ArrayList<>(0);
}
else if (args.get(3) instanceof List) {
// correct extensible usage
groups = getArg(3, List.class, args);
if (args.size() >= 5) {
configOverridesMap = getArg(4, Map.class, args);
if (configOverridesMap.isEmpty()) configOverridesMap = null;
}
}
else {
// Deprecated "varargs" style usage for groups_list
// configOverridesMap cannot be specified so it remains null.
groups = getGroupsArg(3, args);
}
Map<String, Object> effectiveConfig = getEffectiveConfig(context, configOverridesMap);
Object defaultValue = null;
//lazily create new profiler client if needed
if (client == null || !cachedConfigMap.equals(effectiveConfig)) {
RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(effectiveConfig);
ColumnBuilder columnBuilder = getColumnBuilder(effectiveConfig);
long periodDuration = getPeriodDurationInMillis(effectiveConfig);
String tableName = PROFILER_HBASE_TABLE.get(effectiveConfig, String.class);
Configuration hbaseConfig = HBaseConfiguration.create();
client = new HBaseProfilerClient(getTableProvider(effectiveConfig), rowKeyBuilder, columnBuilder, periodDuration, tableName, hbaseConfig);
cachedConfigMap = effectiveConfig;
}
if(cachedConfigMap != null) {
defaultValue = ProfilerClientConfig.PROFILER_DEFAULT_VALUE.get(cachedConfigMap);
}
List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups,
periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
// return only the value of each profile measurement
List<Object> values = new ArrayList<>();
for(ProfileMeasurement m: measurements) {
values.add(m.getProfileValue());
}
return values;
}
/**
* Get the groups defined by the user.
*
* The user can specify 0 or more groups. All arguments from the specified position
* on are assumed to be groups. If there is no argument in the specified position,
* then it is assumed the user did not specify any groups.
*
* @param startIndex The starting index of groups within the function argument list.
* @param args The function arguments.
* @return The groups.
*/
private List<Object> getGroupsArg(int startIndex, List<Object> args) {
List<Object> groups = new ArrayList<>();
for(int i=startIndex; i<args.size(); i++) {
String group = getArg(i, String.class, args);
groups.add(group);
}
return groups;
}
/**
* Creates the ColumnBuilder to use in accessing the profile data.
* @param global The global configuration.
*/
private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
ColumnBuilder columnBuilder;
String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
return columnBuilder;
}
/**
* Creates the ColumnBuilder to use in accessing the profile data.
* @param global The global configuration.
*/
private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
// how long is the profile period?
long duration = PROFILER_PERIOD.get(global, Long.class);
LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
// which units are used to define the profile period?
String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
TimeUnit units = TimeUnit.valueOf(configuredUnits);
LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
// what is the salt divisor?
Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
return new SaltyRowKeyBuilder(saltDivisor, duration, units);
}
/**
* Create the TableProvider to use when accessing HBase.
* @param global The global configuration.
*/
private TableProvider getTableProvider(Map<String, Object> global) {
String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
TableProvider provider;
try {
@SuppressWarnings("unchecked")
Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
provider = clazz.getConstructor().newInstance();
} catch (Exception e) {
provider = new HTableProvider();
}
return provider;
}
}