blob: ea4d65b9d17c36e2facc92dce0241a8a40287af2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.profiler.client.stellar;
import org.apache.commons.lang3.Range;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.client.window.WindowProcessor;
import org.apache.metron.stellar.common.StellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.DefaultVariableResolver;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static org.junit.jupiter.api.Assertions.*;
public class WindowLookbackTest {
static FunctionResolver resolver;
static Context context;
@BeforeAll
public static void setup() {
resolver = new SimpleFunctionResolver()
.withClass(GetProfile.class)
.withClass(FixedLookback.class)
.withClass(WindowLookback.class);
context = new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> new HashMap<>())
.build();
}
@Test
public void testSpecifyingConfig() {
//we should be able to specify the config and have it take hold. If we change the
//profile duration to 1 minute instead of 15 minutes (the default), then we should see
//the correct number of profiles.
long durationMs = 60000;
Map<String, Object> config = new HashMap<>();
config.put(ProfilerClientConfig.PROFILER_PERIOD.getKey(), 1);
State state = test("1 hour", new Date(), Optional.of(config), Assertions.NOT_EMPTY, Assertions.CONTIGUOUS);
assertEquals(TimeUnit.HOURS.toMillis(1) / durationMs, state.periods.size());
}
@Test
public void testSpecifyingOnlySelector() {
String stellarStatement = "PROFILE_WINDOW('1 hour')";
Map<String, Object> variables = new HashMap<>();
StellarProcessor stellar = new StellarProcessor();
List<ProfilePeriod> periods = (List<ProfilePeriod>)stellar.parse( stellarStatement
, new DefaultVariableResolver(k -> variables.get(k),k -> variables.containsKey(k))
, resolver
, context
);
assertEquals(TimeUnit.HOURS.toMillis(1) / getDurationMs(), periods.size());
}
@Test
public void testDenseLookback() {
State state = test("1 hour", Assertions.NOT_EMPTY, Assertions.CONTIGUOUS);
assertEquals(TimeUnit.HOURS.toMillis(1) / getDurationMs(), state.periods.size());
}
@Test
public void testShiftedDenseLookback() {
State state = test("from 2 hours ago to 30 minutes ago", Assertions.NOT_EMPTY
, Assertions.CONTIGUOUS
, Assertions.INTERVALS_CONTAIN_ALL_PERIODS
);
assertEquals(TimeUnit.MINUTES.toMillis(90) / getDurationMs(), state.periods.size());
}
@Test
public void testShiftedSparseLookback() {
State state = test("30 minute window every 1 hour from 2 hours ago to 30 minutes ago", Assertions.NOT_EMPTY
, Assertions.DISCONTIGUOUS
, Assertions.INTERVALS_CONTAIN_ALL_PERIODS
);
assertEquals(TimeUnit.MINUTES.toMillis(60) / getDurationMs(), state.periods.size());
}
@Test
public void testEmptyDueToExclusions() {
test("30 minute window every 24 hours from 7 days ago including saturdays excluding weekends", Assertions.EMPTY);
}
@Test
public void testErrorInSelector() {
assertThrows(
ParseException.class,
() ->
test(
"30 minute idow every 24 hours from 7 days ago including saturdays excluding weekends",
Assertions.EMPTY));
}
long getDurationMs() {
int duration = ProfilerClientConfig.PROFILER_PERIOD.getDefault(Integer.class);
TimeUnit unit = TimeUnit.valueOf(ProfilerClientConfig.PROFILER_PERIOD_UNITS.getDefault(String.class));
return unit.toMillis(duration);
}
public State test(String windowSelector, Assertions... assertions) {
return test(windowSelector, new Date(), Optional.empty(), assertions);
}
public State test(String windowSelector, Date now, Optional<Map<String, Object>> config, Assertions... assertions) {
List<Range<Long>> windowIntervals = WindowProcessor.process(windowSelector).toIntervals(now.getTime());
String stellarStatement = "PROFILE_WINDOW('" + windowSelector + "', now"
+ (config.isPresent()?", config":"")
+ ")";
Map<String, Object> variables = new HashMap<>();
variables.put("now", now.getTime());
if(config.isPresent()) {
variables.put("config", config.get());
}
StellarProcessor stellar = new StellarProcessor();
List<ProfilePeriod> periods = (List<ProfilePeriod>)stellar.parse( stellarStatement
, new DefaultVariableResolver(k -> variables.get(k),k -> variables.containsKey(k))
, resolver
, context
);
State state = new State(windowIntervals, periods);
for(Assertions assertion : assertions) {
assertTrue(assertion.test(state), assertion.name());
}
return state;
}
private enum Assertions implements Predicate<State>{
EMPTY( state -> state.windowIntervals.isEmpty() && state.periods.isEmpty() ),
NOT_EMPTY( state -> !state.windowIntervals.isEmpty() && !state.periods.isEmpty()),
CONTIGUOUS( state -> {
if(state.periods.size() < 2) {
return true;
}
long duration = state.periods.get(1).getStartTimeMillis() - state.periods.get(0).getStartTimeMillis();
for(int i = 1;i < state.periods.size();++i) {
long left = state.periods.get(i - 1).getStartTimeMillis();
long right = state.periods.get(i).getStartTimeMillis();
if(right - left != duration) {
return false;
}
}
return true;
}),
DISCONTIGUOUS( state -> !Assertions.CONTIGUOUS.test(state)),
INTERVALS_CONTAIN_ALL_PERIODS( state -> {
List<Range<Long>> windowIntervals = state.windowIntervals;
List<ProfilePeriod> periods = state.periods;
Set<Range<Long>> foundIntervals = new HashSet<>();
for(ProfilePeriod period : periods) {
boolean found = false;
for(Range<Long> interval : windowIntervals) {
if(interval.contains(period.getStartTimeMillis())) {
foundIntervals.add(interval);
found = true;
}
}
if(!found) {
return false;
}
}
return foundIntervals.size() == windowIntervals.size();
})
;
Predicate<State> predicate;
Assertions(Predicate<State> predicate) {
this.predicate = predicate;
}
@Override
public boolean test(State s) {
return predicate.test(s);
}
}
private static class State {
List<Range<Long>> windowIntervals;
List<ProfilePeriod> periods;
public State(List<Range<Long>> windowIntervals, List<ProfilePeriod> periods) {
this.periods = periods;
this.windowIntervals = windowIntervals;
}
}
}