blob: 883e23d5cf3f7cf253f576a4fb9e47a7a4864f33 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.demos.dimensions.benchmark;
import com.datatorrent.common.util.Slice;
import com.datatorrent.contrib.hdht.AbstractSinglePortHDHTWriter;
import com.datatorrent.contrib.hdht.MutableKeyValue;
import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
import java.io.IOException;
import java.util.Arrays;
/**
* HDSOperator
*
* @since 2.0.0
*/
public class HDSOperator extends AbstractSinglePortHDHTWriter<MutableKeyValue>
{
public boolean isReadModifyWriteMode()
{
return readModifyWriteMode;
}
public void setReadModifyWriteMode(boolean readModifyWriteMode)
{
this.readModifyWriteMode = readModifyWriteMode;
}
private boolean readModifyWriteMode = false;
public static class MutableKeyValCodec extends KryoSerializableStreamCodec<MutableKeyValue> implements HDHTCodec<MutableKeyValue>
{
@Override public byte[] getKeyBytes(MutableKeyValue mutableKeyValue)
{
return mutableKeyValue.getKey();
}
@Override public byte[] getValueBytes(MutableKeyValue mutableKeyValue)
{
return mutableKeyValue.getValue();
}
@Override public MutableKeyValue fromKeyValue(Slice key, byte[] value)
{
MutableKeyValue pair = new MutableKeyValue(null, null);
pair.setKey(key.buffer);
pair.setValue(value);
return pair;
}
@Override public int getPartition(MutableKeyValue tuple)
{
return Arrays.hashCode(tuple.getKey());
}
}
@Override protected HDHTCodec<MutableKeyValue> getCodec()
{
return new MutableKeyValCodec();
}
@Override protected void processEvent(MutableKeyValue event) throws IOException
{
if (readModifyWriteMode) {
// do get and then put to simulate read-modify-write workload.
byte[] oldval = super.get(getBucketKey(event), new Slice(event.getKey()));
if (oldval != null) {
// Modify event.
byte[] newval = event.getValue();
for (int i = 0; i < newval.length; i++)
if (i < newval.length)
oldval[i] += newval[i];
event.setValue(oldval);
}
}
super.processEvent(event);
}
}