blob: e3bd80b7454e047b353da4cfb067275cf333f863 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*========================================================================
*/
package com.gemstone.gemfire.cache.hdfs.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.lru.Sizeable;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.offheap.StoredObject;
import com.gemstone.gemfire.internal.offheap.annotations.Retained;
import com.gemstone.gemfire.internal.util.BlobHelper;
/**
* Gateway event extended for HDFS functionality
*
* @author Hemant Bhanawat
*/
public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
private static final long serialVersionUID = 4642852957292192406L;
protected transient boolean keyIsSerialized = false;
protected byte[] serializedKey = null;
protected VersionTag versionTag;
public HDFSGatewayEventImpl(){
}
@Retained
public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
Object substituteValue)
throws IOException {
super(operation, event, substituteValue);
initializeHDFSGatewayEventObject(event);
}
@Retained
public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
Object substituteValue, boolean initialize, int bucketId) throws IOException {
super(operation, event,substituteValue, initialize, bucketId);
initializeHDFSGatewayEventObject(event);
}
@Retained
public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
Object substituteValue, boolean initialize) throws IOException {
super(operation, event, substituteValue, initialize);
initializeHDFSGatewayEventObject(event);
}
protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
super(offHeapEvent);
this.keyIsSerialized = offHeapEvent.keyIsSerialized;
this.serializedKey = offHeapEvent.serializedKey;
this.versionTag = offHeapEvent.versionTag;
}
@Override
protected GatewaySenderEventImpl makeCopy() {
return new HDFSGatewayEventImpl(this);
}
private void initializeHDFSGatewayEventObject(EntryEvent event)
throws IOException {
serializeKey();
versionTag = ((EntryEventImpl)event).getVersionTag();
if (versionTag != null && versionTag.getMemberID() == null) {
versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
}
}
private void serializeKey() throws IOException {
if (!keyIsSerialized && isInitialized())
{
this.serializedKey = CacheServerHelper.serialize(this.key);
keyIsSerialized = true;
}
}
/**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
/*@Override
protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
boolean hasNonWanDispatcher) {
return event.getOffHeapNewValue();
}*/
/**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
/*@Override
protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
boolean hasNonWanDispatcher) {
return event.getRawNewValue(shouldApplyDelta());
}*/
@Override
protected boolean shouldApplyDelta() {
return true;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writeObject(this.versionTag, out);
}
@Override
protected void serializeKey(DataOutput out) throws IOException {
DataSerializer.writeByteArray((byte[])this.serializedKey, out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.versionTag = (VersionTag)DataSerializer.readObject(in);
}
@Override
protected void deserializeKey(DataInput in) throws IOException,
ClassNotFoundException {
this.serializedKey = DataSerializer.readByteArray(in);
this.key = BlobHelper.deserializeBlob(this.serializedKey,
InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
keyIsSerialized = true;
}
@Override
public int getDSFID() {
return HDFS_GATEWAY_EVENT_IMPL;
}
public byte[] getSerializedKey() {
return this.serializedKey;
}
public VersionTag getVersionTag() {
return this.versionTag;
}
/**
* Returns the size on HDFS of this event
* @param writeOnly
*/
public int getSizeOnHDFSInBytes(boolean writeOnly) {
if (writeOnly)
return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
getSerializedValueSize(), this.versionTag);
else
return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,
getSerializedValueSize(), this.versionTag);
}
}