blob: 87b4da2e3ecc1f29936c340796a2eaafa683560e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.iterator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.examples.wikisearch.parser.EventFields;
import org.apache.accumulo.examples.wikisearch.parser.QueryEvaluator;
import org.apache.commons.jexl2.parser.ParseException;
import org.apache.log4j.Logger;
import com.esotericsoftware.kryo.Kryo;
/**
*
* This iterator aggregates rows together using the specified key comparator. Subclasses will provide their own implementation of fillMap which will fill the
* supplied EventFields object with field names (key) and field values (value). After all fields have been put into the aggregated object (by aggregating all
* columns with the same key), the EventFields object will be compared against the supplied expression. If the expression returns true, then the return key and
* return value can be retrieved via getTopKey() and getTopValue().
*
* Optionally, the caller can set an expression (field operator value) that should not be evaluated against the event. For example, if the query is
* "A == 'foo' and B == 'bar'", but for some reason B may not be in the data, then setting the UNEVALUATED_EXPRESSIONS option to "B == 'bar'" will allow the
* events to be evaluated against the remainder of the expression and still return as true.
*
* By default this iterator will return all Events in the shard. If the START_DATE and END_DATE are specified, then this iterator will evaluate the timestamp of
* the key against the start and end dates. If the event date is not within the range of start to end, then it is skipped.
*
* This iterator will return up the stack an EventFields object serialized using Kryo in the cell Value.
*
*/
public abstract class AbstractEvaluatingIterator implements SortedKeyValueIterator<Key,Value>, OptionDescriber {
private static Logger log = Logger.getLogger(AbstractEvaluatingIterator.class);
protected static final byte[] NULL_BYTE = new byte[0];
public static final String QUERY_OPTION = "expr";
public static final String UNEVALUTED_EXPRESSIONS = "unevaluated.expressions";
private PartialKey comparator = null;
protected SortedKeyValueIterator<Key,Value> iterator;
private Key currentKey = new Key();
private Key returnKey;
private Value returnValue;
private String expression;
private QueryEvaluator evaluator;
private EventFields event = null;
private static Kryo kryo = new Kryo();
private Range seekRange = null;
private Set<String> skipExpressions = null;
protected AbstractEvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
iterator = other.iterator.deepCopy(env);
event = other.event;
}
public AbstractEvaluatingIterator() {}
/**
* Implementations will return the PartialKey value to use for comparing keys for aggregating events
*
* @return the type of comparator to use
*/
public abstract PartialKey getKeyComparator();
/**
* When the query expression evaluates to true against the event, the event fields will be serialized into the Value and returned up the iterator stack.
* Implemenations will need to provide a key to be used with the event.
*
* @param k
* @return the key that should be returned with the map of values.
*/
public abstract Key getReturnKey(Key k) throws Exception;
/**
* Implementations will need to fill the map with field visibilities, names, and values. When all fields have been aggregated the event will be evaluated
* against the query expression.
*
* @param event
* Multimap of event names and fields.
* @param key
* current Key
* @param value
* current Value
*/
public abstract void fillMap(EventFields event, Key key, Value value) throws Exception;
/**
* Provides the ability to skip this key and all of the following ones that match using the comparator.
*
* @param key
* @return true if the key should be acted upon, otherwise false.
* @throws IOException
*/
public abstract boolean isKeyAccepted(Key key) throws IOException;
/**
* Reset state.
*/
public void reset() {
event.clear();
}
private void aggregateRowColumn(EventFields event) throws IOException {
currentKey.set(iterator.getTopKey());
try {
fillMap(event, iterator.getTopKey(), iterator.getTopValue());
iterator.next();
while (iterator.hasTop() && iterator.getTopKey().equals(currentKey, this.comparator)) {
fillMap(event, iterator.getTopKey(), iterator.getTopValue());
iterator.next();
}
// Get the return key
returnKey = getReturnKey(currentKey);
} catch (Exception e) {
throw new IOException("Error aggregating event", e);
}
}
private void findTop() throws IOException {
do {
reset();
// check if aggregation is needed
if (iterator.hasTop()) {
// Check to see if the current key is accepted. For example in the wiki
// table there are field index rows. We don't want to process those in
// some cases so return right away. Consume all of the non-accepted keys
while (iterator.hasTop() && !isKeyAccepted(iterator.getTopKey())) {
iterator.next();
}
if (iterator.hasTop()) {
aggregateRowColumn(event);
// Evaluate the event against the expression
if (event.size() > 0 && this.evaluator.evaluate(event)) {
if (log.isDebugEnabled()) {
log.debug("Event evaluated to true, key = " + returnKey);
}
// Create a byte array
byte[] serializedMap = new byte[event.getByteSize() + (event.size() * 20)];
// Wrap in ByteBuffer to work with Kryo
ByteBuffer buf = ByteBuffer.wrap(serializedMap);
// Serialize the EventFields object
event.writeObjectData(kryo, buf);
// Truncate array to the used size.
returnValue = new Value(Arrays.copyOfRange(serializedMap, 0, buf.position()));
} else {
returnKey = null;
returnValue = null;
}
} else {
if (log.isDebugEnabled()) {
log.debug("Iterator no longer has top.");
}
}
} else {
log.debug("Iterator.hasTop() == false");
}
} while (returnValue == null && iterator.hasTop());
// Sanity check. Make sure both returnValue and returnKey are null or both are not null
if (!((returnKey == null && returnValue == null) || (returnKey != null && returnValue != null))) {
log.warn("Key: " + ((returnKey == null) ? "null" : returnKey.toString()));
log.warn("Value: " + ((returnValue == null) ? "null" : returnValue.toString()));
throw new IOException("Return values are inconsistent");
}
}
public Key getTopKey() {
if (returnKey != null) {
return returnKey;
}
return iterator.getTopKey();
}
public Value getTopValue() {
if (returnValue != null) {
return returnValue;
}
return iterator.getTopValue();
}
public boolean hasTop() {
return returnKey != null || iterator.hasTop();
}
public void next() throws IOException {
if (returnKey != null) {
returnKey = null;
returnValue = null;
} else if (iterator.hasTop()) {
iterator.next();
}
findTop();
}
/**
* Copy of IteratorUtil.maximizeStartKeyTimeStamp due to IllegalAccessError
*
* @param range
* @return
*/
static Range maximizeStartKeyTimeStamp(Range range) {
Range seekRange = range;
if (range.getStartKey() != null && range.getStartKey().getTimestamp() != Long.MAX_VALUE) {
Key seekKey = new Key(seekRange.getStartKey());
seekKey.setTimestamp(Long.MAX_VALUE);
seekRange = new Range(seekKey, true, range.getEndKey(), range.isEndKeyInclusive());
}
return seekRange;
}
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
// do not want to seek to the middle of a value that should be
// aggregated...
seekRange = maximizeStartKeyTimeStamp(range);
iterator.seek(seekRange, columnFamilies, inclusive);
findTop();
if (range.getStartKey() != null) {
while (hasTop() && getTopKey().equals(range.getStartKey(), this.comparator) && getTopKey().getTimestamp() > range.getStartKey().getTimestamp()) {
// the value has a more recent time stamp, so
// pass it up
// log.debug("skipping "+getTopKey());
next();
}
while (hasTop() && range.beforeStartKey(getTopKey())) {
next();
}
}
}
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
validateOptions(options);
event = new EventFields();
this.comparator = getKeyComparator();
this.iterator = source;
try {
// Replace any expressions that we should not evaluate.
if (null != this.skipExpressions && this.skipExpressions.size() != 0) {
for (String skip : this.skipExpressions) {
// Expression should have form: field<sp>operator<sp>literal.
// We are going to replace the expression with field == null.
String field = skip.substring(0, skip.indexOf(" ") - 1);
this.expression = this.expression.replaceAll(skip, field + " == null");
}
}
this.evaluator = new QueryEvaluator(this.expression);
} catch (ParseException e) {
throw new IllegalArgumentException("Failed to parse query", e);
}
EventFields.initializeKryo(kryo);
}
public IteratorOptions describeOptions() {
Map<String,String> options = new HashMap<String,String>();
options.put(QUERY_OPTION, "query expression");
options.put(UNEVALUTED_EXPRESSIONS, "comma separated list of expressions to skip");
return new IteratorOptions(getClass().getSimpleName(), "evaluates event objects against an expression", options, null);
}
public boolean validateOptions(Map<String,String> options) {
if (!options.containsKey(QUERY_OPTION))
return false;
else
this.expression = options.get(QUERY_OPTION);
if (options.containsKey(UNEVALUTED_EXPRESSIONS)) {
String expressionList = options.get(UNEVALUTED_EXPRESSIONS);
if (expressionList != null && !expressionList.trim().equals("")) {
this.skipExpressions = new HashSet<String>();
for (String e : expressionList.split(","))
this.skipExpressions.add(e);
}
}
return true;
}
public String getQueryExpression() {
return this.expression;
}
}