blob: ff5b2e6551e5e41c918c076ad03927f7934ee88c [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.bucket;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang.mutable.MutableLong;
import com.datatorrent.lib.counters.BasicCounters;
/**
* <p>
* Bucket manager creates and manages {@link Bucket}s for an Operator.<br/>
* Only a limited number of buckets can be held in memory at a particular time. The manager is responsible for loading
* of a bucket when it is requested by the operator and un-loading least recently used buckets to optimize memory usage.
* </p>
*
* <p>
* Loading of buckets and saving new bucket events to storage is triggered by the Operator. Typically an Operator would:
* <ol>
* <li>fetch the bucket key of an event by calling {@link #getBucketKeyFor(Bucketable)}.</li>
* <li>invoke {@link #getBucket(long)}. If this returns null or events from disk are not loaded then the operator can ask
* the manager to load the bucket by calling {@link BucketManager#loadBucketData(long)}. This is not a blocking call.<br/>
* </li>
* <li>
* Once the manager loads a bucket, it informs {@link BucketManager.Listener} by calling {@link BucketManager.Listener#bucketLoaded(Bucket)}.<br/>
* If there were some buckets that were off-loaded during the process {@link BucketManager.Listener#bucketOffLoaded(long)}
* callback is triggered.
* </li>
* <li>
* The operator could then add new events to a bucket by invoking {@link #newEvent(long, Bucketable)}. These events are
* maintained in a check-pointed state.
* </li>
* <li>
* To ensure that at any given point of time all the load requests are completed, the operator can trigger {@link #blockUntilAllRequestsServiced()}.
* </li>
* <li>
* The operator triggers {@link #endWindow(long)} which tells the manager to persist un-written data.
* </li>
* </ol>
* </p>
*
* @param <T> event type
* @since 0.9.4
*/
public interface BucketManager<T extends Bucketable>
{
void setBucketStore(@Nonnull BucketStore<T> bucketStore);
BucketStore<T> getBucketStore();
/**
* Starts the service.
*
* @param listener {@link Listener} which will be informed when bucket are loaded and off-loaded.
*/
void startService(Listener<T> listener);
/**
* Shuts down the service.
*/
void shutdownService();
/**
* Calculates the bucket key of an event.<br/>
* -ve values indicate invalid event.
*
* @param event event
* @return bucket key for event.
*/
long getBucketKeyFor(T event);
/**
* <p>
* Returns the bucket in memory corresponding to a bucket key.
* If the bucket is not created yet, it will return null.</br>
* A bucket is created:
* <ul>
* <li> When the operator requests to load a bucket from store.</li>
* <li>In {@link #startService(Listener)} if there were un-written bucket events which were
* check-pointed by the engine.</li>
* </ul>
* </p>
*
* @param bucketKey key of the bucket.
* @return bucket; null if the bucket is not yet created.
*/
@Nullable
Bucket<T> getBucket(long bucketKey);
/**
* Loads the events belonging to the bucket from the store. <br/>
* The events loaded belong to the windows less than the current window.
*
* @param bucketKey key of the bucket
*/
void loadBucketData(long bucketKey);
/**
* Adds the event to the un-written section of the bucket corresponding to the bucket key.
*
* @param bucketKey key of the bucket.
* @param event new event.
*/
void newEvent(long bucketKey, T event);
/**
* Does end window operations which includes tracking the committed window and
* persisting all un-written events in the store.
*
* @param window window number.
*/
void endWindow(long window);
/**
* Blocks the calling thread until all the load requests of this window have been serviced.
*
* @throws InterruptedException
*/
void blockUntilAllRequestsServiced() throws InterruptedException;
/**
* Constructs a new {@link BucketManager} with only the settings and not the data.
*
* @return newly created manager without any data.
*/
BucketManager<T> cloneWithProperties();
void setBucketCounters(@Nonnull BasicCounters<MutableLong> stats);
/**
* Collects the un-written events of all the old managers and distributes the data to the new managers.<br/>
* The partition to which an event belongs to depends on the event key.
* <pre><code>partition = eventKey.hashCode() & partitionMask</code></pre>
*
* @param oldManagers {@link BucketManager}s of all old partitions.
* @param partitionKeysToManagers mapping of partition keys to {@link BucketManager}s of new partitions.
* @param partitionMask partition mask to find which partition an event belongs to.
*/
void definePartitions(List<BucketManager<T>> oldManagers, Map<Integer, BucketManager<T>> partitionKeysToManagers, int partitionMask);
/**
* Callback interface for {@link BucketManager} for load and off-load operations.
*
* @param <T> Type of the values which can be bucketed.
*/
public static interface Listener<T extends Bucketable>
{
/**
* Invoked when a bucket is loaded from store.
*
* @param loadedBucket bucket that was loaded.
*/
void bucketLoaded(Bucket<T> loadedBucket);
/**
* Invoked when a bucket is removed from memory.<br/>
* If the listener has cached a bucket it should use this callback to remove it from the cache.
*
* @param bucketKey key of the bucket which was off-loaded.
*/
void bucketOffLoaded(long bucketKey);
}
public static enum CounterKeys
{
BUCKETS_IN_MEMORY, EVICTED_BUCKETS, DELETED_BUCKETS, EVENTS_COMMITTED_LAST_WINDOW,
EVENTS_IN_MEMORY
}
}