blob: a7261b99bc9a0759ca485a550c750a40c8c284ac [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.hdht;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.common.util.Slice;
import com.datatorrent.contrib.hdht.HDHTFileAccess.HDSFileReader;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
/**
* Reader for historical data store.
* Implements asynchronous read from backing files and query refresh.
*
* @displayName HDHT Reader
* @category Input
* @tags hds, input operator
*/
public class HDHTReader implements Operator, HDHT.Reader
{
public static final String FNAME_WAL = "_WAL";
public static final String FNAME_META = "_META";
public static class HDSQuery
{
public long bucketKey;
public Slice key;
public int keepAliveCount;
public volatile byte[] result;
public volatile boolean processed;
@Override public String toString()
{
return "HDSQuery{" +
"bucketKey=" + bucketKey +
", key=" + key +
", keepAliveCount=" + keepAliveCount +
", result=" + Arrays.toString(result) +
", processed=" + processed +
'}';
}
}
private static final Logger LOG = LoggerFactory.getLogger(HDHTReader.class);
protected final transient Kryo kryo = new Kryo();
@NotNull
protected Comparator<Slice> keyComparator = new DefaultKeyComparator();
@Valid
@NotNull
protected HDHTFileAccess store;
public BucketMeta loadBucketMeta(long bucketKey)
{
BucketMeta bucketMeta = null;
try {
InputStream is = store.getInputStream(bucketKey, FNAME_META);
bucketMeta = (BucketMeta)kryo.readClassAndObject(new Input(is));
is.close();
} catch (IOException e) {
bucketMeta = new BucketMeta(keyComparator);
}
return bucketMeta;
}
/**
* Map containing all current queries. Accessed by operator and reader threads.
*/
protected final ConcurrentMap<Slice, HDSQuery> queries = Maps.newConcurrentMap();
private final transient Map<Long, BucketReader> buckets = Maps.newHashMap();
@VisibleForTesting
protected transient ExecutorService queryExecutor;
private volatile transient Exception executorError;
public HDHTReader()
{
}
/**
* Compare keys for sequencing as secondary level of organization within buckets.
* In most cases it will be implemented using a time stamp as leading component.
* @return The key comparator.
*/
public Comparator<Slice> getKeyComparator()
{
return keyComparator;
}
public void setKeyComparator(Comparator<Slice> keyComparator)
{
this.keyComparator = keyComparator;
}
public HDHTFileAccess getFileStore()
{
return store;
}
public void setFileStore(HDHTFileAccess fileStore)
{
this.store = fileStore;
}
@Override
public void setup(OperatorContext context)
{
this.store.init();
if (queryExecutor == null) {
queryExecutor = Executors.newSingleThreadScheduledExecutor(new NameableThreadFactory(this.getClass().getSimpleName()+"-Reader"));
}
}
@Override
public void teardown()
{
for (BucketReader bucket : this.buckets.values()) {
IOUtils.closeQuietly(bucket);
}
IOUtils.closeQuietly(store);
queryExecutor.shutdown();
}
@Override
public void beginWindow(long windowId)
{
}
@Override
public void endWindow()
{
Iterator<Map.Entry<Slice, HDSQuery>> it = this.queries.entrySet().iterator();
while (it.hasNext()) {
HDSQuery query = it.next().getValue();
if (!query.processed) {
processQuery(query);
}
// could be processed directly
if (query.processed) {
emitQueryResult(query);
if (--query.keepAliveCount < 0) {
//LOG.debug("Removing expired query {}", query);
it.remove(); // query expired
}
}
}
if (executorError != null) {
throw new RuntimeException("Error processing queries.", this.executorError);
}
}
/**
* Fetch result for the given query from persistent storage
* Subclass can override this to serve from write cache.
*/
protected void processQuery(final HDSQuery query)
{
Runnable readerRunnable = new Runnable() {
@Override
public void run()
{
try {
LOG.debug("Processing {}", query);
query.result = get(query.bucketKey, query.key);
query.processed = true;
} catch (Exception e) {
executorError = e;
}
}
};
this.queryExecutor.execute(readerRunnable);
}
protected BucketReader getReader(long bucketKey)
{
BucketReader br = this.buckets.get(bucketKey);
if (br == null) {
this.buckets.put(bucketKey, br = new BucketReader());
}
// meta data can be invalidated on write without removing unaffected readers
if (br.bucketMeta == null) {
LOG.debug("Reading {} {}", bucketKey, FNAME_META);
br.bucketMeta = loadBucketMeta(bucketKey);
}
return br;
}
protected void invalidateReader(long bucketKey, Set<String> fileNames)
{
BucketReader bucket = this.buckets.get(bucketKey);
if (bucket != null) {
bucket.bucketMeta = null; // force index reload
for (String name : fileNames) {
LOG.debug("Closing reader {}", name);
IOUtils.closeQuietly(bucket.readers.remove(name));
}
}
}
private static Slice GET_KEY = new Slice(null, 0, 0);
@Override
public byte[] get(long bucketKey, Slice key) throws IOException
{
for (int i=0; i<10; i++) {
BucketReader bucket = getReader(bucketKey);
BucketMeta bucketMeta = bucket.bucketMeta;
if (bucketMeta == null) {
// meta data invalidated
continue;
}
Map.Entry<Slice, BucketFileMeta> floorEntry = bucket.bucketMeta.files.floorEntry(key);
if (floorEntry == null) {
// no file for this key
return null;
}
try {
HDSFileReader reader = bucket.readers.get(floorEntry.getValue().name);
if (reader == null) {
LOG.debug("Opening file {} {}", bucketKey, floorEntry.getValue().name);
bucket.readers.put(floorEntry.getValue().name, reader = store.getReader(bucketKey, floorEntry.getValue().name));
}
Slice value = new Slice(null, 0,0);
if (reader.seek(key)) {
reader.next(GET_KEY, value);
}
if (value.offset == 0) {
return value.buffer;
} else {
// this is inefficient, should return Slice
return Arrays.copyOfRange(value.buffer, value.offset, value.offset + value.length);
}
} catch (IOException e) {
// check for meta file update
this.buckets.remove(bucketKey);
bucket.close();
bucket = getReader(bucketKey);
Map.Entry<Slice, BucketFileMeta> newEntry = bucket.bucketMeta.files.floorEntry(key);
if (newEntry != null && newEntry.getValue().name.compareTo(floorEntry.getValue().name) == 0) {
// file still the same - error unrelated to rewrite
throw e;
}
// retry
LOG.debug("Retry after meta data change bucket {} from {} to {}", bucketKey, floorEntry, newEntry);
continue;
}
}
return null;
}
protected void addQuery(HDSQuery query)
{
HDSQuery existingQuery = this.queries.get(query.key);
if (existingQuery != null) {
query.keepAliveCount = Math.max(query.keepAliveCount, existingQuery.keepAliveCount);
}
this.queries.put(query.key, query);
}
protected void emitQueryResult(HDSQuery query)
{
}
/**
* Default key comparator that performs lexicographical comparison of the byte arrays.
*/
public static class DefaultKeyComparator implements Comparator<Slice>
{
@Override
public int compare(Slice o1, Slice o2)
{
return WritableComparator.compareBytes(o1.buffer, o1.offset, o1.length, o2.buffer, o2.offset, o2.length);
}
}
public static class BucketFileMeta
{
/**
* Name of file (relative to bucket)
*/
public String name;
/**
* Lower bound sequence key
*/
public Slice startKey;
@Override
public String toString()
{
return "BucketFileMeta [name=" + name + ", startKey=" + startKey + "]";
}
}
/**
* Meta data about bucket, persisted in store
* Flushed on compaction
*/
public static class BucketMeta
{
protected BucketMeta(Comparator<Slice> cmp)
{
files = new TreeMap<Slice, BucketFileMeta>(cmp);
}
@SuppressWarnings("unused")
private BucketMeta()
{
// for serialization only
files = null;
}
protected BucketFileMeta addFile(long bucketKey, Slice startKey)
{
BucketFileMeta bfm = new BucketFileMeta();
bfm.name = Long.toString(bucketKey) + '-' + this.fileSeq++;
if (startKey.length != startKey.buffer.length) {
// normalize key for serialization
startKey = new Slice(startKey.toByteArray());
}
bfm.startKey = startKey;
files.put(startKey, bfm);
return bfm;
}
int fileSeq;
long committedWid;
final TreeMap<Slice, BucketFileMeta> files;
}
private static class BucketReader implements Closeable
{
BucketMeta bucketMeta;
final HashMap<String, HDSFileReader> readers = Maps.newHashMap();
@Override
public void close() throws IOException
{
for (HDSFileReader reader : readers.values()) {
reader.close();
}
}
}
}