blob: d51023c2d9d8a3c4725e07b1562f5b69c7ae7d12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.examples.wikisearch.iterator;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.accumulo.examples.wikisearch.parser.EventFields;
import org.apache.accumulo.examples.wikisearch.parser.EventFields.FieldValue;
import org.apache.commons.collections.map.LRUMap;
import org.apache.hadoop.io.Text;
public class EvaluatingIterator extends AbstractEvaluatingIterator {
public static final String NULL_BYTE_STRING = "\u0000";
LRUMap visibilityMap = new LRUMap();
public EvaluatingIterator() {
super();
}
public EvaluatingIterator(AbstractEvaluatingIterator other, IteratorEnvironment env) {
super(other, env);
}
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
return new EvaluatingIterator(this, env);
}
@Override
public PartialKey getKeyComparator() {
return PartialKey.ROW_COLFAM;
}
@Override
public Key getReturnKey(Key k) {
// If we were using column visibility, then we would get the merged visibility here and use it in the key.
// Remove the COLQ from the key and use the combined visibility
Key r = new Key(k.getRowData().getBackingArray(), k.getColumnFamilyData().getBackingArray(), NULL_BYTE, k.getColumnVisibility().getBytes(),
k.getTimestamp(), k.isDeleted(), false);
return r;
}
@Override
public void fillMap(EventFields event, Key key, Value value) {
// If we were using column visibility, we would have to merge them here.
// Pull the datatype from the colf in case we need to do anything datatype specific.
// String colf = key.getColumnFamily().toString();
// String datatype = colf.substring(0, colf.indexOf(NULL_BYTE_STRING));
// For the partitioned table, the field name and field value are stored in the column qualifier
// separated by a \0.
String colq = key.getColumnQualifier().toString();// .toLowerCase();
int idx = colq.indexOf(NULL_BYTE_STRING);
String fieldName = colq.substring(0, idx);
String fieldValue = colq.substring(idx + 1);
event.put(fieldName, new FieldValue(getColumnVisibility(key), fieldValue.getBytes()));
}
/**
* @param key
* @return
*/
public ColumnVisibility getColumnVisibility(Key key) {
ColumnVisibility result = (ColumnVisibility) visibilityMap.get(key.getColumnVisibility());
if (result != null)
return result;
result = new ColumnVisibility(key.getColumnVisibility().getBytes());
visibilityMap.put(key.getColumnVisibility(), result);
return result;
}
/**
* Don't accept this key if the colf starts with 'fi'
*/
@Override
public boolean isKeyAccepted(Key key) throws IOException {
if (key.getColumnFamily().toString().startsWith("fi")) {
Key copy = new Key(key.getRow(), new Text("fi\01"));
Collection<ByteSequence> columnFamilies = Collections.emptyList();
this.iterator.seek(new Range(copy, copy), columnFamilies, true);
if (this.iterator.hasTop())
return isKeyAccepted(this.iterator.getTopKey());
return true;
}
return true;
}
}