blob: 5a9368f5ce6eb023c6615d2d54691d6b4d4ebfc3 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.bucket;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import com.google.common.collect.Maps;
/**
* <p>
* The bucket data-structure contains all the events which belong to the same bucket.
* </p>
* <p>
* Events in a bucket are divided in 2 sections: <br/>
* <ul>
* <li> Written:
* Known events which are loaded from a persistent store {@link BucketStore}
* </li>
* <li> Un-written: New events which are not persisted yet.</li>
* </ul>
* A bucket differentiates between these events by keeping them in 2 separate collections.
* </p>
* <p>
* Buckets can be modified only by {@link BucketManager}.
* Operators that work with buckets can only query it but not modify it.
* </p>
*
* @param <T> type of bucket events
* @since 0.9.4
*/
public class Bucket<T extends Bucketable>
{
public final long bucketKey;
private Map<Object, T> unwrittenEvents;
private transient Map<Object, T> writtenEvents;
private transient long lastUpdateTime;
private transient boolean isDataOnDiskLoaded;
@SuppressWarnings("unused")
private Bucket()
{
bucketKey = -1L;
}
Bucket(long bucketKey)
{
this.bucketKey = bucketKey;
this.isDataOnDiskLoaded = false;
this.lastUpdateTime = System.currentTimeMillis();
}
void setWrittenEvents(@Nonnull Map<Object, T> writtenEvents)
{
this.writtenEvents = writtenEvents;
isDataOnDiskLoaded = true;
}
void setUnwrittenEvents(@Nonnull Map<Object, T> unwrittenEvents)
{
this.unwrittenEvents = unwrittenEvents;
}
void transferDataFromMemoryToStore()
{
if (writtenEvents == null) {
writtenEvents = unwrittenEvents;
}
else {
writtenEvents.putAll(unwrittenEvents);
}
unwrittenEvents = null;
}
void updateAccessTime()
{
lastUpdateTime = System.currentTimeMillis();
}
void addNewEvent(Object eventKey, T event)
{
if (unwrittenEvents == null) {
unwrittenEvents = Maps.newHashMap();
}
unwrittenEvents.put(eventKey, event);
}
Map<Object, T> getWrittenEvents()
{
return Collections.unmodifiableMap(writtenEvents);
}
Map<Object, T> getUnwrittenEvents()
{
return Collections.unmodifiableMap(unwrittenEvents);
}
long lastUpdateTime()
{
return lastUpdateTime;
}
/**
* Given an event key, fetches the event from written section of the bucket.
*
* @param key event key
* @return event corresponding to the event key if it is present in the written portion; null otherwise.
*/
@Nullable
public T getValueFromWrittenPart(Object key)
{
if (writtenEvents == null) {
return null;
}
return writtenEvents.get(key);
}
/**
* Given an event key, fetches the event from un-written section of bucket.
*
* @param key event key
* @return event corresponding to the event key if it is present in the unwritten portion; null otherwise.
*/
@Nullable
public T getValueFromUnwrittenPart(Object key)
{
if (unwrittenEvents == null) {
return null;
}
return unwrittenEvents.get(key);
}
/**
* Calculates the number of persisted events.
*
* @return number of events which are have been written to a persistent store.
*/
public int countOfWrittenEvents()
{
if (writtenEvents == null) {
return 0;
}
return writtenEvents.size();
}
/**
* Calculates the number of events which are not persisted yet.
*
* @return number of events which have not been written to a persistent store.
*/
public int countOfUnwrittenEvents()
{
if (unwrittenEvents == null) {
return 0;
}
return unwrittenEvents.size();
}
/**
* Returns whether the bucket data persisted on the disk is loaded.
*
* @return true if persisted data is loaded; false otherwise.
*/
public boolean isDataOnDiskLoaded()
{
return isDataOnDiskLoaded;
}
/**
* Finds whether the bucket contains the event.
*
* @param event the {@link Bucketable} to search for in the bucket.
* @return true if bucket has the event; false otherwise.
*/
public boolean containsEvent(T event)
{
if (unwrittenEvents != null && unwrittenEvents.containsKey(event.getEventKey())) {
return true;
}
return writtenEvents != null && writtenEvents.containsKey(event.getEventKey());
}
@Override
public String toString()
{
return "Bucket {" + bucketKey + "}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof Bucket)) {
return false;
}
return bucketKey == ((Bucket<?>)o).bucketKey;
}
@Override
public int hashCode()
{
return (int)(bucketKey ^ (bucketKey >>> 32));
}
}