blob: 611ee5c9f854f8dd726cce226617ba15d54a7ff5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.state.managed;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.fileaccess.FileAccess;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.datatorrent.api.Context;
import com.datatorrent.netlet.util.Slice;
/**
* In this implementation of {@link AbstractManagedStateImpl} the buckets in memory are time-buckets.
*
* @since 3.4.0
*/
public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState
{
private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
public ManagedTimeUnifiedStateImpl()
{
bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
}
@Override
public long getNumBuckets()
{
return timeBucketAssigner.getNumBuckets();
}
@Override
public void put(long time, @NotNull Slice key, @NotNull Slice value)
{
long timeBucket = timeBucketAssigner.getTimeBucket(time);
putInBucket(timeBucket, timeBucket, key, value);
}
@Override
public Slice getSync(long time, @NotNull Slice key)
{
long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return BucketedState.EXPIRED;
}
return getValueFromBucketSync(timeBucket, timeBucket, key);
}
@Override
public Future<Slice> getAsync(long time, @NotNull Slice key)
{
long timeBucket = timeBucketAssigner.getTimeBucket(time);
if (timeBucket == -1) {
//time is expired so return expired slice.
return Futures.immediateFuture(BucketedState.EXPIRED);
}
return getValueFromBucketAsync(timeBucket, timeBucket, key);
}
@Override
public void endWindow()
{
super.endWindow();
Long purgedTimeBucket;
//collect all the purged time buckets
while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
long purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
if (buckets.containsKey(purgedTimeBucketIdx) && buckets.get(purgedTimeBucketIdx).getBucketId() == purgedTimeBucket) {
bucketsForTeardown.add(buckets.get(purgedTimeBucketIdx));
buckets.remove(purgedTimeBucketIdx);
}
}
//tear down all the eligible time buckets
Iterator<Bucket> bucketIterator = bucketsForTeardown.iterator();
while (bucketIterator.hasNext()) {
Bucket bucket = bucketIterator.next();
if (!tasksPerBucketId.containsKey(bucket.getBucketId())) {
//no pending asynchronous queries for this bucket id
bucket.teardown();
bucketIterator.remove();
}
}
}
@Override
protected void handleBucketConflict(long bucketId, long newBucketId)
{
Preconditions.checkArgument(buckets.get(bucketId).getBucketId() < newBucketId, "new time bucket should have a value"
+ " greater than the old time bucket");
//Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
bucketsForTeardown.add(buckets.get(bucketId));
buckets.put(bucketId, newBucket(newBucketId));
buckets.get(bucketId).setup(this);
}
@Override
public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
{
purgedTimeBuckets.add(timeBucket);
super.purgeTimeBucketsLessThanEqualTo(timeBucket);
}
@Override
public void setup(Context.OperatorContext context)
{
// set UnboundedTimeBucketAssigner to this managed state impl
if (timeBucketAssigner == null) {
UnboundedTimeBucketAssigner unboundedTimeBucketAssigner = new UnboundedTimeBucketAssigner();
setTimeBucketAssigner(unboundedTimeBucketAssigner);
}
super.setup(context);
}
/**
* This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
* multiple partitions may work on same time-buckets.
*/
private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem
{
@Override
protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
{
return managedStateContext.getFileAccess().getWriter(managedStateContext.getOperatorContext().getId(), fileName);
}
@Override
protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
{
return managedStateContext.getFileAccess().getReader(managedStateContext.getOperatorContext().getId(), fileName);
}
@Override
protected void rename(long bucketId, String fromName, String toName) throws IOException
{
managedStateContext.getFileAccess().rename(managedStateContext.getOperatorContext().getId(), fromName, toName);
}
@Override
protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
{
return managedStateContext.getFileAccess().getOutputStream(managedStateContext.getOperatorContext().getId(),
fileName);
}
@Override
protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
{
return managedStateContext.getFileAccess().getInputStream(managedStateContext.getOperatorContext().getId(),
fileName);
}
@Override
protected boolean exists(long bucketId, String fileName) throws IOException
{
return managedStateContext.getFileAccess().exists(managedStateContext.getOperatorContext().getId(),
fileName);
}
@Override
protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
{
return managedStateContext.getFileAccess().listFiles(managedStateContext.getOperatorContext().getId());
}
@Override
protected void delete(long bucketId, String fileName) throws IOException
{
managedStateContext.getFileAccess().delete(managedStateContext.getOperatorContext().getId(), fileName);
}
@Override
protected void deleteBucket(long bucketId) throws IOException
{
managedStateContext.getFileAccess().deleteBucket(managedStateContext.getOperatorContext().getId());
}
@Override
protected void addBucketName(long bucketId)
{
long operatorId = managedStateContext.getOperatorContext().getId();
if (!bucketNamesOnFS.contains(operatorId)) {
bucketNamesOnFS.add(operatorId);
}
}
}
private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);
}