blob: d2700d6ff9495f386a33f1ab24c2ccc9b49b31a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.reference.lookup.accesstracker;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnel;
import com.google.common.hash.PrimitiveSink;
import org.apache.metron.reference.lookup.LookupKey;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class BloomAccessTracker implements AccessTracker {
private static final long serialVersionUID = 1L;
public static final String EXPECTED_INSERTIONS_KEY = "expected_insertions";
public static final String FALSE_POSITIVE_RATE_KEY = "false_positive_rate";
public static final String NAME_KEY = "name";
private static class LookupKeyFunnel implements Funnel<LookupKey> {
@Override
public void funnel(LookupKey lookupKey, PrimitiveSink primitiveSink) {
primitiveSink.putBytes(lookupKey.toBytes());
}
@Override
public boolean equals(Object obj) {
return this.getClass().equals(obj.getClass());
}
}
private static Funnel<LookupKey> LOOKUPKEY_FUNNEL = new LookupKeyFunnel();
BloomFilter<LookupKey> filter;
String name;
int expectedInsertions;
double falsePositiveRate;
int numInsertions = 0;
public BloomAccessTracker(String name, int expectedInsertions, double falsePositiveRate) {
this.name = name;
this.expectedInsertions = expectedInsertions;
this.falsePositiveRate = falsePositiveRate;
filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
}
public BloomAccessTracker() {}
public BloomAccessTracker(Map<String, Object> config) {
configure(config);
}
protected BloomFilter<LookupKey> getFilter() {
return filter;
}
@Override
public void logAccess(LookupKey key) {
numInsertions++;
filter.put(key);
}
@Override
public void configure(Map<String, Object> config) {
expectedInsertions = toInt(config.get(EXPECTED_INSERTIONS_KEY));
falsePositiveRate = toDouble(config.get(FALSE_POSITIVE_RATE_KEY));
name = config.get(NAME_KEY).toString();
filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
}
@Override
public boolean hasSeen(LookupKey key) {
return filter.mightContain(key);
}
@Override
public void reset() {
filter = BloomFilter.create(LOOKUPKEY_FUNNEL, expectedInsertions, falsePositiveRate);
}
private static double toDouble(Object o) {
if(o instanceof String) {
return Double.parseDouble((String)o);
}
else if(o instanceof Number) {
return ((Number) o).doubleValue();
}
else {
throw new IllegalStateException("Unable to convert " + o + " to a double.");
}
}
private static int toInt(Object o) {
if(o instanceof String) {
return Integer.parseInt((String)o);
}
else if(o instanceof Number) {
return ((Number) o).intValue();
}
else {
throw new IllegalStateException("Unable to convert " + o + " to a double.");
}
}
@Override
public String getName() {
return name;
}
@Override
public AccessTracker union(AccessTracker tracker) {
if(filter == null) {
throw new IllegalStateException("Unable to union access tracker, because this tracker is not initialized.");
}
if(tracker instanceof BloomAccessTracker ) {
filter.putAll(((BloomAccessTracker)tracker).getFilter());
return this;
}
else {
throw new IllegalStateException("Unable to union access tracker, because it's not of the right type (BloomAccessTracker)");
}
}
@Override
public boolean isFull() {
return numInsertions >= expectedInsertions;
}
@Override
public void cleanup() throws IOException {
}
}