blob: 290c635c4199bfb97928d6e19a3883ce9487a472 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.spark.function;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.json.simple.JSONObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class HBaseWriterFunctionTest {
Properties profilerProperties;
@BeforeEach
public void setup() {
profilerProperties = getProfilerProperties();
// create a mock table for HBase
String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
MockHBaseTableProvider.addToCache(tableName, columnFamily);
}
@Test
public void testWrite() throws Exception {
JSONObject message = getMessage();
String entity = (String) message.get("ip_src_addr");
long timestamp = (Long) message.get("timestamp");
ProfileConfig profile = getProfile();
// setup the profile measurements that will be written
List<ProfileMeasurement> measurements = createMeasurements(1, entity, timestamp, profile);
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
// write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
// validate the result
List<Integer> counts = IteratorUtils.toList(results);
assertEquals(1, counts.size());
assertEquals(1, counts.get(0).intValue());
}
@Test
public void testWriteMany() throws Exception {
JSONObject message = getMessage();
String entity = (String) message.get("ip_src_addr");
long timestamp = (Long) message.get("timestamp");
ProfileConfig profile = getProfile();
// setup the profile measurements that will be written
List<ProfileMeasurement> measurements = createMeasurements(10, entity, timestamp, profile);
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
// write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
// validate the result
List<Integer> counts = IteratorUtils.toList(results);
assertEquals(1, counts.size());
assertEquals(10, counts.get(0).intValue());
}
@Test
public void testWriteNone() throws Exception {
// there are no profile measurements to write
List<ProfileMeasurement> measurements = new ArrayList<>();
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
// write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
// validate the result
List<Integer> counts = IteratorUtils.toList(results);
assertEquals(1, counts.size());
assertEquals(0, counts.get(0).intValue());
}
/**
* Create a list of measurements for testing.
*
* @param count The number of messages to create.
* @param entity The entity.
* @param timestamp The timestamp.
* @param profile The profile definition.
* @return
*/
private List<ProfileMeasurement> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
List<ProfileMeasurement> measurements = new ArrayList<>();
for(int i=0; i<count; i++) {
measurements.add(new ProfileMeasurement()
.withProfileName(profile.getProfile())
.withEntity(entity)
.withPeriod(timestamp, 15, TimeUnit.MINUTES)
.withProfileValue(new OnlineStatisticsProvider()));
}
return measurements;
}
/**
* Returns a telemetry message to use for testing.
*/
private JSONObject getMessage() {
JSONObject message = new JSONObject();
message.put("ip_src_addr", "192.168.1.1");
message.put("status", "red");
message.put("timestamp", System.currentTimeMillis());
return message;
}
/**
* Returns profiler properties to use for testing.
*/
private Properties getProfilerProperties() {
return new Properties();
}
/**
* Returns a profile definition to use for testing.
*/
private ProfileConfig getProfile() {
return new ProfileConfig()
.withProfile("profile1")
.withForeach("ip_src_addr")
.withUpdate("count", "count + 1")
.withResult("count");
}
}