blob: 4b621d1049b46aca3bdca0fa915dcf72ef597078 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.lib.utils.serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.netlet.util.Slice;
/**
*
* keep the information of one block
*
*
* @since 3.6.0
*/
public class Block
{
public static class OutOfBlockBufferMemoryException extends RuntimeException
{
private static final long serialVersionUID = 3813792889200989131L;
}
private static final Logger logger = LoggerFactory.getLogger(Block.class);
public static final int DEFAULT_BLOCK_SIZE = 100000;
//the capacity of the block
private int capacity;
/*
* the size of the data.
*/
private volatile int size;
private int objectBeginOffset = 0;
private byte[] buffer;
/**
* whether any slices have been exposed to the caller.
*/
private boolean exposedSlices;
private Block()
{
this(DEFAULT_BLOCK_SIZE);
}
public Block(int capacity)
{
if (capacity <= 0) {
throw new IllegalArgumentException("Invalid capacity: " + capacity);
}
buffer = new byte[capacity];
this.capacity = capacity;
}
public void write(byte data)
{
checkOrReallocateBuffer(1);
buffer[size++] = data;
}
public void write(byte[] data)
{
write(data, 0, data.length);
}
public void write(byte[] data, final int offset, final int length)
{
checkOrReallocateBuffer(length);
System.arraycopy(data, offset, buffer, size, length);
size += length;
}
/**
* check the buffer size and reallocate if buffer is not enough
*
* @param length
*/
private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException
{
if (size + length <= capacity) {
return;
}
if (exposedSlices) {
throw new OutOfBlockBufferMemoryException();
}
//calculate the new capacity
capacity = (size + length) * 2;
byte[] oldBuffer = buffer;
buffer = new byte[capacity];
/**
* no slices are exposed in this block yet (this is the first object in this block).
* so we can reallocate and move the memory
*/
if (size > 0) {
System.arraycopy(oldBuffer, 0, buffer, 0, size);
}
}
/**
* Similar to toSlice, this method is used to get the information of the
* object regards the data already write to buffer. But unlike toSlice() which
* indicates all the writes of this object are already done, this method can be called at
* any time
*/
public Slice getLastObjectSlice()
{
return new Slice(buffer, objectBeginOffset, size - objectBeginOffset);
}
public void discardLastObjectData()
{
if (objectBeginOffset == 0) {
return;
}
size = objectBeginOffset;
}
public void moveLastObjectDataTo(Block newBlock)
{
if (size > objectBeginOffset) {
newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset);
discardLastObjectData();
}
}
/**
* This method returns the slice that represents the serialized form.
* The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call.
* A call to toSlice indicates the writes are done for this object
*
* @return
*/
public BufferSlice toSlice()
{
if (size == objectBeginOffset) {
throw new RuntimeException("data size is zero.");
}
BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset);
//prepare for next object
objectBeginOffset = size;
exposedSlices = true;
return slice;
}
public void reset()
{
size = 0;
objectBeginOffset = 0;
exposedSlices = false;
}
/**
* check if the block has enough space for the length
*
* @param length
* @return
*/
public boolean hasEnoughSpace(int length)
{
return size + length < capacity;
}
public long size()
{
return size;
}
public long capacity()
{
return capacity;
}
public boolean isFresh()
{
return (size == 0 && objectBeginOffset == 0 && exposedSlices == false);
}
/**
* Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call.
*
* @return
*/
public boolean isClear()
{
return objectBeginOffset == size;
}
public void release()
{
reset();
buffer = null;
}
}