blob: 59e1568fc8ff4671ff35eba8e49babdbb9d5f407 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.types;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemoryUtils;
import org.apache.flink.util.InstantiationUtil;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.io.Serializable;
import java.io.UTFDataFormatException;
import java.nio.ByteOrder;
/**
* The Record represents a multi-valued data record. The record is a tuple of arbitrary values. It
* implements a sparse tuple model, meaning that the record can contain many fields which are
* actually null and not represented in the record. It has internally a bitmap marking which fields
* are set and which are not.
*
* <p>For efficient data exchange, a record that is read from any source holds its data in
* serialized binary form. Fields are deserialized lazily upon first access. Modified fields are
* cached and the modifications are incorporated into the binary representation upon the next
* serialization or any explicit call to the {@link #updateBinaryRepresenation()} method.
*
* <p>IMPORTANT NOTE: Records must be used as mutable objects and be reused across user function
* calls in order to achieve performance. The record is a heavy-weight object, designed to minimize
* calls to the individual fields' serialization and deserialization methods. It holds quite a bit
* of state consumes a comparably large amount of memory (&gt; 200 bytes in a 64 bit JVM) due to
* several pointers and arrays.
*
* <p>This class is NOT thread-safe!
*/
@Public
public final class Record implements Value, CopyableValue<Record> {
private static final long serialVersionUID = 1L;
private static final int NULL_INDICATOR_OFFSET =
Integer.MIN_VALUE; // value marking a field as null
private static final int MODIFIED_INDICATOR_OFFSET =
Integer.MIN_VALUE + 1; // value marking field as modified
private static final int DEFAULT_FIELD_LEN_ESTIMATE = 8; // length estimate for bin array
// --------------------------------------------------------------------------------------------
private final InternalDeSerializer serializer =
new InternalDeSerializer(); // DataInput and DataOutput abstraction
private byte[] binaryData; // the buffer containing the binary representation
private byte[] switchBuffer; // the buffer containing the binary representation
private int[] offsets; // the offsets to the binary representations of the fields
private int[] lengths; // the lengths of the fields
private Value[]
readFields; // the cache for objects into which the binary representations are read
private Value[]
writeFields; // the cache for objects into which the binary representations are read
private int binaryLen; // the length of the contents in the binary buffer that is valid
private int numFields; // the number of fields in the record
private int firstModifiedPos; // position of the first modification (since (de)serialization)
// --------------------------------------------------------------------------------------------
/** Required nullary constructor for instantiation by serialization logic. */
public Record() {}
/**
* Creates a new record containing only a single field, which is the given value.
*
* @param value The value for the single field of the record.
*/
public Record(Value value) {
setField(0, value);
}
/**
* Creates a new record containing exactly two fields, which are the given values.
*
* @param val1 The value for the first field.
* @param val2 The value for the second field.
*/
public Record(Value val1, Value val2) {
makeSpace(2);
setField(0, val1);
setField(1, val2);
}
/**
* Creates a new record, containing the given number of fields. The fields are initially all
* nulls.
*
* @param numFields The number of fields for the record.
*/
public Record(int numFields) {
setNumFields(numFields);
}
// --------------------------------------------------------------------------------------------
// Basic Accessors
// --------------------------------------------------------------------------------------------
/**
* Gets the number of fields currently in the record. This also includes null fields.
*
* @return The number of fields in the record.
*/
public int getNumFields() {
return this.numFields;
}
/**
* Sets the number of fields in the record. If the new number of fields is longer than the
* current number of fields, then null fields are appended. If the new number of fields is
* smaller than the current number of fields, then the last fields are truncated.
*
* @param numFields The new number of fields.
*/
public void setNumFields(final int numFields) {
final int oldNumFields = this.numFields;
// check whether we increase or decrease the fields
if (numFields > oldNumFields) {
makeSpace(numFields);
for (int i = oldNumFields; i < numFields; i++) {
this.offsets[i] = NULL_INDICATOR_OFFSET;
}
markModified(oldNumFields);
} else {
// decrease the number of fields
// we do not remove the values from the cache, as the objects (if they are there) will
// most likely
// be reused when the record is re-filled
markModified(numFields);
}
this.numFields = numFields;
}
/**
* Reserves space for at least the given number of fields in the internal arrays.
*
* @param numFields The number of fields to reserve space for.
*/
public void makeSpace(int numFields) {
final int oldNumFields = this.numFields;
// increase the number of fields in the arrays
if (this.offsets == null) {
this.offsets = new int[numFields];
} else if (this.offsets.length < numFields) {
int[] newOffs = new int[Math.max(numFields + 1, oldNumFields << 1)];
System.arraycopy(this.offsets, 0, newOffs, 0, oldNumFields);
this.offsets = newOffs;
}
if (this.lengths == null) {
this.lengths = new int[numFields];
} else if (this.lengths.length < numFields) {
int[] newLens = new int[Math.max(numFields + 1, oldNumFields << 1)];
System.arraycopy(this.lengths, 0, newLens, 0, oldNumFields);
this.lengths = newLens;
}
if (this.readFields == null) {
this.readFields = new Value[numFields];
} else if (this.readFields.length < numFields) {
Value[] newFields = new Value[Math.max(numFields + 1, oldNumFields << 1)];
System.arraycopy(this.readFields, 0, newFields, 0, oldNumFields);
this.readFields = newFields;
}
if (this.writeFields == null) {
this.writeFields = new Value[numFields];
} else if (this.writeFields.length < numFields) {
Value[] newFields = new Value[Math.max(numFields + 1, oldNumFields << 1)];
System.arraycopy(this.writeFields, 0, newFields, 0, oldNumFields);
this.writeFields = newFields;
}
}
/**
* Gets the field at the given position from the record. This method checks internally, if this
* instance of the record has previously returned a value for this field. If so, it reuses the
* object, if not, it creates one from the supplied class.
*
* @param <T> The type of the field.
* @param fieldNum The logical position of the field.
* @param type The type of the field as a class. This class is used to instantiate a value
* object, if none had previously been instantiated.
* @return The field at the given position, or null, if the field was null.
* @throws IndexOutOfBoundsException Thrown, if the field number is negative or larger or equal
* to the number of fields in this record.
*/
@SuppressWarnings("unchecked")
public <T extends Value> T getField(final int fieldNum, final Class<T> type) {
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException(
fieldNum + " for range [0.." + (this.numFields - 1) + "]");
}
// get offset and check for null
final int offset = this.offsets[fieldNum];
if (offset == NULL_INDICATOR_OFFSET) {
return null;
} else if (offset == MODIFIED_INDICATOR_OFFSET) {
// value that has been set is new or modified
return (T) this.writeFields[fieldNum];
}
final int limit = offset + this.lengths[fieldNum];
// get an instance, either from the instance cache or create a new one
final Value oldField = this.readFields[fieldNum];
final T field;
if (oldField != null && oldField.getClass() == type) {
field = (T) oldField;
} else {
field = InstantiationUtil.instantiate(type, Value.class);
this.readFields[fieldNum] = field;
}
// deserialize
deserialize(field, offset, limit, fieldNum);
return field;
}
/**
* Gets the field at the given position. The method tries to deserialize the fields into the
* given target value. If the fields has been changed since the last (de)serialization, or is
* null, them the target value is left unchanged and the changed value (or null) is returned.
*
* <p>In all cases, the returned value contains the correct data (or is correctly null).
*
* @param fieldNum The position of the field.
* @param target The value to deserialize the field into.
* @return The value with the contents of the requested field, or null, if the field is null.
*/
@SuppressWarnings("unchecked")
public <T extends Value> T getField(int fieldNum, T target) {
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException();
}
if (target == null) {
throw new NullPointerException("The target object may not be null");
}
// get offset and check for null
final int offset = this.offsets[fieldNum];
if (offset == NULL_INDICATOR_OFFSET) {
return null;
} else if (offset == MODIFIED_INDICATOR_OFFSET) {
// value that has been set is new or modified
// bring the binary in sync so that the deserialization gives the correct result
return (T) this.writeFields[fieldNum];
}
final int limit = offset + this.lengths[fieldNum];
deserialize(target, offset, limit, fieldNum);
return target;
}
/**
* Gets the field at the given position. If the field at that position is null, then this method
* leaves the target field unchanged and returns false.
*
* @param fieldNum The position of the field.
* @param target The value to deserialize the field into.
* @return True, if the field was deserialized properly, false, if the field was null.
*/
public boolean getFieldInto(int fieldNum, Value target) {
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException();
}
// get offset and check for null
int offset = this.offsets[fieldNum];
if (offset == NULL_INDICATOR_OFFSET) {
return false;
} else if (offset == MODIFIED_INDICATOR_OFFSET) {
// value that has been set is new or modified
// bring the binary in sync so that the deserialization gives the correct result
updateBinaryRepresenation();
offset = this.offsets[fieldNum];
}
final int limit = offset + this.lengths[fieldNum];
deserialize(target, offset, limit, fieldNum);
return true;
}
/**
* Gets the fields at the given positions into an array. If at any position a field is null,
* then this method returns false. All fields that have been successfully read until the failing
* read are correctly contained in the record. All other fields are not set.
*
* @param positions The positions of the fields to get.
* @param targets The values into which the content of the fields is put.
* @return True if all fields were successfully read, false if some read failed.
*/
public boolean getFieldsInto(int[] positions, Value[] targets) {
for (int i = 0; i < positions.length; i++) {
if (!getFieldInto(positions[i], targets[i])) {
return false;
}
}
return true;
}
/**
* Gets the fields at the given positions into an array. If at any position a field is null,
* then this method throws a @link NullKeyFieldException. All fields that have been successfully
* read until the failing read are correctly contained in the record. All other fields are not
* set.
*
* @param positions The positions of the fields to get.
* @param targets The values into which the content of the fields is put.
* @throws NullKeyFieldException in case of a failing field read.
*/
public void getFieldsIntoCheckingNull(int[] positions, Value[] targets) {
for (int i = 0; i < positions.length; i++) {
if (!getFieldInto(positions[i], targets[i])) {
throw new NullKeyFieldException(i);
}
}
}
/**
* Deserializes the given object from the binary string, starting at the given position. If the
* deserialization asks for more that <code>limit - offset</code> bytes, than an exception is
* thrown.
*
* @param <T> The generic type of the value to be deserialized.
* @param target The object to deserialize the data into.
* @param offset The offset in the binary string.
* @param limit The limit in the binary string.
*/
private <T extends Value> void deserialize(T target, int offset, int limit, int fieldNumber) {
final InternalDeSerializer serializer = this.serializer;
serializer.memory = this.binaryData;
serializer.position = offset;
serializer.end = limit;
try {
target.read(serializer);
} catch (Exception e) {
throw new DeserializationException(
"Error reading field " + fieldNumber + " as " + target.getClass().getName(), e);
}
}
/**
* Sets the field at the given position to the given value. If the field position is larger or
* equal than the current number of fields in the record, than the record is expanded to host as
* many columns.
*
* <p>The value is kept as a reference in the record until the binary representation is
* synchronized. Until that point, all modifications to the value's object will change the value
* inside the record.
*
* <p>The binary representation is synchronized the latest when the record is emitted. It may be
* triggered manually at an earlier point, but it is generally not necessary and advisable.
* Because the synchronization triggers the serialization on all modified values, it may be an
* expensive operation.
*
* @param fieldNum The position of the field, starting at zero.
* @param value The new value.
*/
public void setField(int fieldNum, Value value) {
// range check
if (fieldNum < 0) {
throw new IndexOutOfBoundsException();
}
// if the field number is beyond the size, the tuple is expanded
if (fieldNum >= this.numFields) {
setNumFields(fieldNum + 1);
}
internallySetField(fieldNum, value);
}
/** @param value */
public void addField(Value value) {
final int num = this.numFields;
setNumFields(num + 1);
internallySetField(num, value);
}
private void internallySetField(int fieldNum, Value value) {
// check if we modify an existing field
this.offsets[fieldNum] = value != null ? MODIFIED_INDICATOR_OFFSET : NULL_INDICATOR_OFFSET;
this.writeFields[fieldNum] = value;
markModified(fieldNum);
}
private void markModified(int field) {
if (this.firstModifiedPos > field) {
this.firstModifiedPos = field;
}
}
private boolean isModified() {
return this.firstModifiedPos != Integer.MAX_VALUE;
}
/**
* Removes the field at the given position.
*
* <p>This method should be used carefully. Be aware that as the field is actually removed from
* the record, the total number of fields is modified, and all fields to the right of the field
* removed shift one position to the left.
*
* @param fieldNum The position of the field to be removed, starting at zero.
* @throws IndexOutOfBoundsException Thrown, when the position is not between 0 (inclusive) and
* the number of fields (exclusive).
*/
public void removeField(int fieldNum) {
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException();
}
int lastIndex = this.numFields - 1;
if (fieldNum < lastIndex) {
int len = lastIndex - fieldNum;
System.arraycopy(this.offsets, fieldNum + 1, this.offsets, fieldNum, len);
System.arraycopy(this.lengths, fieldNum + 1, this.lengths, fieldNum, len);
System.arraycopy(this.readFields, fieldNum + 1, this.readFields, fieldNum, len);
System.arraycopy(this.writeFields, fieldNum + 1, this.writeFields, fieldNum, len);
markModified(fieldNum);
}
this.offsets[lastIndex] = NULL_INDICATOR_OFFSET;
this.lengths[lastIndex] = 0;
this.writeFields[lastIndex] = null;
setNumFields(lastIndex);
}
public final boolean isNull(int fieldNum) {
// range check
if (fieldNum < 0 || fieldNum >= this.numFields) {
throw new IndexOutOfBoundsException();
}
// get offset and check for null
final int offset = this.offsets[fieldNum];
return offset == NULL_INDICATOR_OFFSET;
}
/**
* Sets the field at the given position to <code>null</code>.
*
* @param field The field index.
* @throws IndexOutOfBoundsException Thrown, when the position is not between 0 (inclusive) and
* the number of fields (exclusive).
*/
public void setNull(int field) {
// range check
if (field < 0 || field >= this.numFields) {
throw new IndexOutOfBoundsException();
}
internallySetField(field, null);
}
/**
* Sets the fields to <code>null</code> using the given bit mask. The bits correspond to the
* individual columns: <code>(1 == nullify, 0 == keep)</code>.
*
* @param mask Bit mask, where the i-th least significant bit represents the i-th field in the
* record.
*/
public void setNull(long mask) {
for (int i = 0; i < this.numFields; i++, mask >>>= 1) {
if ((mask & 0x1) != 0) {
internallySetField(i, null);
}
}
}
/**
* Sets the fields to <code>null</code> using the given bit mask. The bits correspond to the
* individual columns: <code>(1 == nullify, 0 == keep)</code>.
*
* @param mask Bit mask, where the i-th least significant bit in the n-th bit mask represents
* the <code>(n*64) + i</code>-th field in the record.
*/
public void setNull(long[] mask) {
for (int maskPos = 0, i = 0; i < this.numFields; ) {
long currMask = mask[maskPos];
for (int k = 64; i < this.numFields && k > 0; --k, i++, currMask >>>= 1) {
if ((currMask & 0x1) != 0) {
internallySetField(i, null);
}
}
}
}
/** Clears the record. After this operation, the record will have zero fields. */
public void clear() {
if (this.numFields > 0) {
this.numFields = 0;
this.firstModifiedPos = 0;
}
}
public void concatenate(Record record) {
throw new UnsupportedOperationException();
}
/**
* Unions the other record's fields with this records fields. After the method invocation with
* record <code>B</code> as the parameter, this record <code>A</code> will contain at field
* <code>i</code>:
*
* <ul>
* <li>Field <code>i</code> from record <code>A</code>, if that field is within record <code>A
* </code>'s number of fields and is not <i>null</i>.
* <li>Field <code>i</code> from record <code>B</code>, if that field is within record <code>B
* </code>'s number of fields.
* </ul>
*
* It is not necessary that both records have the same number of fields. This record will have
* the number of fields of the larger of the two records. Naturally, if both <code>A</code> and
* <code>B</code> have field <code>i</code> set to <i>null</i>, this record will have
* <i>null</i> at that position.
*
* @param other The records whose fields to union with this record's fields.
*/
public void unionFields(Record other) {
final int minFields = Math.min(this.numFields, other.numFields);
final int maxFields = Math.max(this.numFields, other.numFields);
final int[] offsets = this.offsets.length >= maxFields ? this.offsets : new int[maxFields];
final int[] lengths = this.lengths.length >= maxFields ? this.lengths : new int[maxFields];
if (!(this.isModified() || other.isModified())) {
// handle the special (but common) case where both records have a valid binary
// representation differently
// allocate space for the switchBuffer first
final int estimatedLength = this.binaryLen + other.binaryLen;
this.serializer.memory =
(this.switchBuffer != null && this.switchBuffer.length >= estimatedLength)
? this.switchBuffer
: new byte[estimatedLength];
this.serializer.position = 0;
try {
// common loop for both records
for (int i = 0; i < minFields; i++) {
final int thisOff = this.offsets[i];
if (thisOff == NULL_INDICATOR_OFFSET) {
final int otherOff = other.offsets[i];
if (otherOff == NULL_INDICATOR_OFFSET) {
offsets[i] = NULL_INDICATOR_OFFSET;
} else {
// take field from other record
offsets[i] = this.serializer.position;
this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
lengths[i] = other.lengths[i];
}
} else {
// copy field from this one
offsets[i] = this.serializer.position;
this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
lengths[i] = this.lengths[i];
}
}
// add the trailing fields from one record
if (minFields != maxFields) {
final Record sourceForRemainder = this.numFields > minFields ? this : other;
int begin = -1;
int end = -1;
int offsetDelta = 0;
// go through the offsets, find the non-null fields to account for the remaining
// data
for (int k = minFields; k < maxFields; k++) {
final int off = sourceForRemainder.offsets[k];
if (off == NULL_INDICATOR_OFFSET) {
offsets[k] = NULL_INDICATOR_OFFSET;
} else {
end = sourceForRemainder.offsets[k] + sourceForRemainder.lengths[k];
if (begin == -1) {
// first non null column in the remainder
begin = sourceForRemainder.offsets[k];
offsetDelta = this.serializer.position - begin;
}
offsets[k] = sourceForRemainder.offsets[k] + offsetDelta;
}
}
// copy the remaining fields directly as binary
if (begin != -1) {
this.serializer.write(sourceForRemainder.binaryData, begin, end - begin);
}
// the lengths can be copied directly
if (lengths != sourceForRemainder.lengths) {
System.arraycopy(
sourceForRemainder.lengths,
minFields,
lengths,
minFields,
maxFields - minFields);
}
}
} catch (Exception ioex) {
throw new RuntimeException(
"Error creating field union of record data" + ioex.getMessage() == null
? "."
: ": " + ioex.getMessage(),
ioex);
}
} else {
// the general case, where at least one of the two records has a binary representation
// that is not in sync.
final int estimatedLength =
(this.binaryLen > 0
? this.binaryLen
: this.numFields * DEFAULT_FIELD_LEN_ESTIMATE)
+ (other.binaryLen > 0
? other.binaryLen
: other.numFields * DEFAULT_FIELD_LEN_ESTIMATE);
this.serializer.memory =
(this.switchBuffer != null && this.switchBuffer.length >= estimatedLength)
? this.switchBuffer
: new byte[estimatedLength];
this.serializer.position = 0;
try {
// common loop for both records
for (int i = 0; i < minFields; i++) {
final int thisOff = this.offsets[i];
if (thisOff == NULL_INDICATOR_OFFSET) {
final int otherOff = other.offsets[i];
if (otherOff == NULL_INDICATOR_OFFSET) {
offsets[i] = NULL_INDICATOR_OFFSET;
} else if (otherOff == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from other record
offsets[i] = this.serializer.position;
other.writeFields[i].write(this.serializer);
lengths[i] = this.serializer.position - offsets[i];
} else {
// take field from other record binary
offsets[i] = this.serializer.position;
this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
lengths[i] = other.lengths[i];
}
} else if (thisOff == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from this record
offsets[i] = this.serializer.position;
this.writeFields[i].write(this.serializer);
lengths[i] = this.serializer.position - offsets[i];
} else {
// copy field from this one
offsets[i] = this.serializer.position;
this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
lengths[i] = this.lengths[i];
}
}
// add the trailing fields from one record
if (minFields != maxFields) {
final Record sourceForRemainder = this.numFields > minFields ? this : other;
// go through the offsets, find the non-null fields
for (int k = minFields; k < maxFields; k++) {
final int off = sourceForRemainder.offsets[k];
if (off == NULL_INDICATOR_OFFSET) {
offsets[k] = NULL_INDICATOR_OFFSET;
} else if (off == MODIFIED_INDICATOR_OFFSET) {
// serialize modified field from the source record
offsets[k] = this.serializer.position;
sourceForRemainder.writeFields[k].write(this.serializer);
lengths[k] = this.serializer.position - offsets[k];
} else {
// copy field from the source record binary
offsets[k] = this.serializer.position;
final int len = sourceForRemainder.lengths[k];
this.serializer.write(sourceForRemainder.binaryData, off, len);
lengths[k] = len;
}
}
}
} catch (Exception ioex) {
throw new RuntimeException(
"Error creating field union of record data" + ioex.getMessage() == null
? "."
: ": " + ioex.getMessage(),
ioex);
}
}
serializeHeader(this.serializer, offsets, maxFields);
// set the fields
this.switchBuffer = this.binaryData;
this.binaryData = serializer.memory;
this.binaryLen = serializer.position;
this.numFields = maxFields;
this.offsets = offsets;
this.lengths = lengths;
this.firstModifiedPos = Integer.MAX_VALUE;
// make sure that the object arrays reflect the size as well
if (this.readFields == null || this.readFields.length < maxFields) {
final Value[] na = new Value[maxFields];
System.arraycopy(this.readFields, 0, na, 0, this.readFields.length);
this.readFields = na;
}
this.writeFields =
(this.writeFields == null || this.writeFields.length < maxFields)
? new Value[maxFields]
: this.writeFields;
}
/** @param target */
public void copyTo(Record target) {
updateBinaryRepresenation();
if (target.binaryData == null || target.binaryData.length < this.binaryLen) {
target.binaryData = new byte[this.binaryLen];
}
if (target.offsets == null || target.offsets.length < this.numFields) {
target.offsets = new int[this.numFields];
}
if (target.lengths == null || target.lengths.length < this.numFields) {
target.lengths = new int[this.numFields];
}
if (target.readFields == null || target.readFields.length < this.numFields) {
target.readFields = new Value[this.numFields];
}
if (target.writeFields == null || target.writeFields.length < this.numFields) {
target.writeFields = new Value[this.numFields];
}
System.arraycopy(this.binaryData, 0, target.binaryData, 0, this.binaryLen);
System.arraycopy(this.offsets, 0, target.offsets, 0, this.numFields);
System.arraycopy(this.lengths, 0, target.lengths, 0, this.numFields);
target.binaryLen = this.binaryLen;
target.numFields = this.numFields;
target.firstModifiedPos = Integer.MAX_VALUE;
}
@Override
public int getBinaryLength() {
return -1;
}
@Override
public Record copy() {
return createCopy();
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int val = source.readUnsignedByte();
target.writeByte(val);
if (val >= MAX_BIT) {
int shift = 7;
int curr;
val = val & 0x7f;
while ((curr = source.readUnsignedByte()) >= MAX_BIT) {
target.writeByte(curr);
val |= (curr & 0x7f) << shift;
shift += 7;
}
target.writeByte(curr);
val |= curr << shift;
}
target.write(source, val);
};
/**
* Creates an exact copy of this record.
*
* @return An exact copy of this record.
*/
public Record createCopy() {
final Record rec = new Record();
copyTo(rec);
return rec;
}
/**
* Bin-copies fields from a source record to this record. The following caveats apply:
*
* <p>If the source field is in a modified state, no binary representation will exist yet. In
* that case, this method is equivalent to {@code setField(..., source.getField(..., <class>))}.
* In particular, if setValue is called on the source field Value instance, that change will
* propagate to this record.
*
* <p>If the source field has already been serialized, then the binary representation will be
* copied. Further modifications to the source field will not be observable via this record, but
* attempting to read the field from this record will cause it to be deserialized.
*
* <p>Finally, bin-copying a source field requires calling updateBinaryRepresentation on this
* instance in order to reserve space in the binaryData array. If none of the source fields are
* actually bin-copied, then updateBinaryRepresentation won't be called.
*
* @param source
* @param sourcePositions
* @param targetPositions
*/
public void copyFrom(
final Record source, final int[] sourcePositions, final int[] targetPositions) {
final int[] sourceOffsets = source.offsets;
final int[] sourceLengths = source.lengths;
final byte[] sourceBuffer = source.binaryData;
final Value[] sourceFields = source.writeFields;
boolean anyFieldIsBinary = false;
int maxFieldNum = 0;
for (int i = 0; i < sourcePositions.length; i++) {
final int sourceFieldNum = sourcePositions[i];
final int sourceOffset = sourceOffsets[sourceFieldNum];
final int targetFieldNum = targetPositions[i];
maxFieldNum = Math.max(targetFieldNum, maxFieldNum);
if (sourceOffset == NULL_INDICATOR_OFFSET) {
// set null on existing field (new fields are null by default)
if (targetFieldNum < numFields) {
internallySetField(targetFieldNum, null);
}
} else if (sourceOffset != MODIFIED_INDICATOR_OFFSET) {
anyFieldIsBinary = true;
}
}
if (numFields < maxFieldNum + 1) {
setNumFields(maxFieldNum + 1);
}
final int[] targetLengths = this.lengths;
final int[] targetOffsets = this.offsets;
// reserve space in binaryData for the binary source fields
if (anyFieldIsBinary) {
for (int i = 0; i < sourcePositions.length; i++) {
final int sourceFieldNum = sourcePositions[i];
final int sourceOffset = sourceOffsets[sourceFieldNum];
if (sourceOffset != MODIFIED_INDICATOR_OFFSET
&& sourceOffset != NULL_INDICATOR_OFFSET) {
final int targetFieldNum = targetPositions[i];
targetLengths[targetFieldNum] = sourceLengths[sourceFieldNum];
internallySetField(targetFieldNum, RESERVE_SPACE);
}
}
updateBinaryRepresenation();
}
final byte[] targetBuffer = this.binaryData;
for (int i = 0; i < sourcePositions.length; i++) {
final int sourceFieldNum = sourcePositions[i];
final int sourceOffset = sourceOffsets[sourceFieldNum];
final int targetFieldNum = targetPositions[i];
if (sourceOffset == MODIFIED_INDICATOR_OFFSET) {
internallySetField(targetFieldNum, sourceFields[sourceFieldNum]);
} else if (sourceOffset != NULL_INDICATOR_OFFSET) {
// bin-copy
final int targetOffset = targetOffsets[targetFieldNum];
final int length = targetLengths[targetFieldNum];
System.arraycopy(sourceBuffer, sourceOffset, targetBuffer, targetOffset, length);
}
}
}
// --------------------------------------------------------------------------------------------
/**
* Checks the values of this record and a given list of values at specified positions for
* equality. The values of this record are deserialized and compared against the corresponding
* search value. The position specify which values are compared. The method returns true if the
* values on all positions are equal and false otherwise.
*
* @param positions The positions of the values to check for equality.
* @param searchValues The values against which the values of this record are compared.
* @param deserializationHolders An array to hold the deserialized values of this record.
* @return True if all the values on all positions are equal, false otherwise.
*/
public final boolean equalsFields(
int[] positions, Value[] searchValues, Value[] deserializationHolders) {
for (int i = 0; i < positions.length; i++) {
final Value v = getField(positions[i], deserializationHolders[i]);
if (v == null || (!v.equals(searchValues[i]))) {
return false;
}
}
return true;
}
// --------------------------------------------------------------------------------------------
/**
* Updates the binary representation of the data, such that it reflects the state of the
* currently stored fields. If the binary representation is already up to date, nothing happens.
* Otherwise, this function triggers the modified fields to serialize themselves into the
* records buffer and afterwards updates the offset table.
*/
public void updateBinaryRepresenation() {
// check whether the binary state is in sync
final int firstModified = this.firstModifiedPos;
if (firstModified == Integer.MAX_VALUE) {
return;
}
final InternalDeSerializer serializer = this.serializer;
final int[] offsets = this.offsets;
final int numFields = this.numFields;
serializer.memory =
this.switchBuffer != null
? this.switchBuffer
: (this.binaryLen > 0
? new byte[this.binaryLen]
: new byte[numFields * DEFAULT_FIELD_LEN_ESTIMATE + 1]);
serializer.position = 0;
if (numFields > 0) {
int offset = 0;
// search backwards to find the latest preceding non-null field
if (firstModified > 0) {
for (int i = firstModified - 1; i >= 0; i--) {
if (this.offsets[i] != NULL_INDICATOR_OFFSET) {
offset = this.offsets[i] + this.lengths[i];
break;
}
}
}
// we assume that changed and unchanged fields are interleaved and serialize into
// another array
try {
if (offset > 0) {
// copy the first unchanged portion as one
serializer.write(this.binaryData, 0, offset);
}
// copy field by field
for (int i = firstModified; i < numFields; i++) {
final int co = offsets[i];
/// skip null fields
if (co == NULL_INDICATOR_OFFSET) {
continue;
}
offsets[i] = offset;
if (co == MODIFIED_INDICATOR_OFFSET) {
final Value writeField = this.writeFields[i];
if (writeField == RESERVE_SPACE) {
// RESERVE_SPACE is a placeholder indicating lengths[i] bytes should be
// reserved
final int length = this.lengths[i];
if (serializer.position >= serializer.memory.length - length - 1) {
serializer.resize(length);
}
serializer.position += length;
} else {
// serialize modified fields
this.writeFields[i].write(serializer);
}
} else {
// bin-copy unmodified fields
serializer.write(this.binaryData, co, this.lengths[i]);
}
this.lengths[i] = serializer.position - offset;
offset = serializer.position;
}
} catch (Exception e) {
throw new RuntimeException(
"Error in data type serialization: " + e.getMessage(), e);
}
}
serializeHeader(serializer, offsets, numFields);
// set the fields
this.switchBuffer = this.binaryData;
this.binaryData = serializer.memory;
this.binaryLen = serializer.position;
this.firstModifiedPos = Integer.MAX_VALUE;
}
private void serializeHeader(
final InternalDeSerializer serializer, final int[] offsets, final int numFields) {
try {
if (numFields > 0) {
int slp = serializer.position; // track the last position of the serializer
// now, serialize the lengths, the sparsity mask and the number of fields
if (numFields <= 8) {
// efficient handling of common case with up to eight fields
int mask = 0;
for (int i = numFields - 1; i > 0; i--) {
if (offsets[i] != NULL_INDICATOR_OFFSET) {
slp = serializer.position;
serializer.writeValLenIntBackwards(offsets[i]);
mask |= 0x1;
}
mask <<= 1;
}
if (offsets[0] != NULL_INDICATOR_OFFSET) {
mask |= 0x1; // add the non-null bit to the mask
} else {
// the first field is null, so some previous field was the first non-null
// field
serializer.position = slp;
}
serializer.writeByte(mask);
} else {
// general case. offsets first (in backward order)
for (int i = numFields - 1; i > 0; i--) {
if (offsets[i] != NULL_INDICATOR_OFFSET) {
slp = serializer.position;
serializer.writeValLenIntBackwards(offsets[i]);
}
}
if (offsets[0] == NULL_INDICATOR_OFFSET) {
serializer.position = slp;
}
// now the mask. we write it in chucks of 8 bit.
// the remainder %8 comes first
int col = numFields - 1;
int mask = 0;
int i = numFields & 0x7;
if (i > 0) {
for (; i > 0; i--, col--) {
mask <<= 1;
mask |= (offsets[col] != NULL_INDICATOR_OFFSET) ? 0x1 : 0x0;
}
serializer.writeByte(mask);
}
// now the eight-bit chunks
for (i = numFields >>> 3; i > 0; i--) {
mask = 0;
for (int k = 0; k < 8; k++, col--) {
mask <<= 1;
mask |= (offsets[col] != NULL_INDICATOR_OFFSET) ? 0x1 : 0x0;
}
serializer.writeByte(mask);
}
}
}
serializer.writeValLenIntBackwards(numFields);
} catch (Exception e) {
throw new RuntimeException("Error serializing Record header: " + e.getMessage(), e);
}
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutputView out) throws IOException {
// make sure everything is in a valid binary representation
updateBinaryRepresenation();
// write the length first, variably encoded, then the contents of the binary array
writeVarLengthInt(out, this.binaryLen);
out.write(this.binaryData, 0, this.binaryLen);
}
@Override
public void read(DataInputView in) throws IOException {
final int len = readVarLengthInt(in);
this.binaryLen = len;
// ensure out byte array is large enough
byte[] data = this.binaryData;
if (data == null || data.length < len) {
data = new byte[len];
this.binaryData = data;
}
// read the binary data
in.readFully(data, 0, len);
initFields(data, 0, len);
}
private void initFields(final byte[] data, final int begin, final int len) {
try {
// read number of fields, variable length encoded reverse at the back
int pos = begin + len - 2;
int numFields = data[begin + len - 1] & 0xFF;
if (numFields >= MAX_BIT) {
int shift = 7;
int curr;
numFields = numFields & 0x7f;
while ((curr = data[pos--] & 0xff) >= MAX_BIT) {
numFields |= (curr & 0x7f) << shift;
shift += 7;
}
numFields |= curr << shift;
}
this.numFields = numFields;
// ensure that all arrays are there and of sufficient size
if (this.offsets == null || this.offsets.length < numFields) {
this.offsets = new int[numFields];
}
if (this.lengths == null || this.lengths.length < numFields) {
this.lengths = new int[numFields];
}
if (this.readFields == null || this.readFields.length < numFields) {
this.readFields = new Value[numFields];
}
if (this.writeFields == null || this.writeFields.length < numFields) {
this.writeFields = new Value[numFields];
}
final int beginMasks = pos; // beginning of bitmap for null fields
final int fieldsBy8 = (numFields >>> 3) + ((numFields & 0x7) == 0 ? 0 : 1);
pos = beginMasks - fieldsBy8;
int lastNonNullField = -1;
for (int field = 0, chunk = 0; chunk < fieldsBy8; chunk++) {
int mask = data[beginMasks - chunk];
for (int i = 0; i < 8 && field < numFields; i++, field++) {
if ((mask & 0x1) == 0x1) {
// not null, so read the offset value, if we are not the first non-null
// fields
if (lastNonNullField >= 0) {
// offset value is variable length encoded
int start = data[pos--] & 0xff;
if (start >= MAX_BIT) {
int shift = 7;
int curr;
start = start & 0x7f;
while ((curr = data[pos--] & 0xff) >= MAX_BIT) {
start |= (curr & 0x7f) << shift;
shift += 7;
}
start |= curr << shift;
}
this.offsets[field] = start + begin;
this.lengths[lastNonNullField] =
start + begin - this.offsets[lastNonNullField];
} else {
this.offsets[field] = begin;
}
lastNonNullField = field;
} else {
// field is null
this.offsets[field] = NULL_INDICATOR_OFFSET;
}
mask >>= 1;
}
}
if (lastNonNullField >= 0) {
this.lengths[lastNonNullField] = pos - this.offsets[lastNonNullField] + 1;
}
this.firstModifiedPos = Integer.MAX_VALUE;
} catch (ArrayIndexOutOfBoundsException aioobex) {
StringBuilder bld = new StringBuilder(len * 4 + 64);
bld.append("Record deserialization error: Record byte signature: ");
for (int i = 0; i < len; i++) {
int num = data[i + begin] & 0xff;
bld.append(num);
if (i < len - 1) {
bld.append(',');
}
}
throw new RuntimeException(bld.toString(), aioobex);
}
}
/**
* Writes this record to the given output view. This method is similar to {@link
* org.apache.flink.core.io.IOReadableWritable#write(org.apache.flink.core.memory.DataOutputView)},
* but it returns the number of bytes written.
*
* @param target The view to write the record to.
* @return The number of bytes written.
* @throws IOException Thrown, if an error occurred in the view during writing.
*/
public long serialize(DataOutputView target) throws IOException {
updateBinaryRepresenation();
long bytesForLen = 1;
int len = this.binaryLen;
while (len >= MAX_BIT) {
target.write(len | MAX_BIT);
len >>= 7;
bytesForLen++;
}
target.write(len);
target.write(this.binaryData, 0, this.binaryLen);
return bytesForLen + this.binaryLen;
}
/**
* @param source
* @throws IOException
*/
public void deserialize(DataInputView source) throws IOException {
read(source);
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
private static final void writeVarLengthInt(DataOutput out, int value) throws IOException {
while (value >= MAX_BIT) {
out.write(value | MAX_BIT);
value >>= 7;
}
out.write(value);
}
private static final int readVarLengthInt(DataInput in) throws IOException {
// read first byte
int val = in.readUnsignedByte();
if (val >= MAX_BIT) {
int shift = 7;
int curr;
val = val & 0x7f;
while ((curr = in.readUnsignedByte()) >= MAX_BIT) {
val |= (curr & 0x7f) << shift;
shift += 7;
}
val |= curr << shift;
}
return val;
}
private static final int MAX_BIT = 0x1 << 7;
// ------------------------------------------------------------------------
// Utility class for internal (de)serialization
// ------------------------------------------------------------------------
/**
* A special Value instance that doesn't actually (de)serialize anything, but instead indicates
* that space should be reserved in the buffer.
*/
private static final Value RESERVE_SPACE =
new Value() {
private static final long serialVersionUID = 1L;
@Override
public void write(DataOutputView out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void read(DataInputView in) throws IOException {
throw new UnsupportedOperationException();
}
};
/** Internal interface class to provide serialization for the data types. */
private static final class InternalDeSerializer
implements DataInputView, DataOutputView, Serializable {
private static final long serialVersionUID = 1L;
private byte[] memory;
private int position;
private int end;
// ----------------------------------------------------------------------------------------
// Data Input
// ----------------------------------------------------------------------------------------
@Override
public boolean readBoolean() throws IOException {
if (this.position < this.end) {
return this.memory[this.position++] != 0;
} else {
throw new EOFException();
}
}
@Override
public byte readByte() throws IOException {
if (this.position < this.end) {
return this.memory[this.position++];
} else {
throw new EOFException();
}
}
@Override
public char readChar() throws IOException {
if (this.position < this.end - 1) {
return (char)
(((this.memory[this.position++] & 0xff) << 8)
| ((this.memory[this.position++] & 0xff) << 0));
} else {
throw new EOFException();
}
}
@Override
public double readDouble() throws IOException {
return Double.longBitsToDouble(readLong());
}
@Override
public float readFloat() throws IOException {
return Float.intBitsToFloat(readInt());
}
@Override
public void readFully(byte[] b) throws IOException {
readFully(b, 0, b.length);
}
@Override
public void readFully(byte[] b, int off, int len) throws IOException {
if (len >= 0) {
if (off <= b.length - len) {
if (this.position <= this.end - len) {
System.arraycopy(this.memory, position, b, off, len);
position += len;
} else {
throw new EOFException();
}
} else {
throw new ArrayIndexOutOfBoundsException();
}
} else if (len < 0) {
throw new IllegalArgumentException("Length may not be negative.");
}
}
@Override
public int readInt() throws IOException {
if (this.position >= 0 && this.position < this.end - 3) {
@SuppressWarnings("restriction")
int value = UNSAFE.getInt(this.memory, BASE_OFFSET + this.position);
if (LITTLE_ENDIAN) {
value = Integer.reverseBytes(value);
}
this.position += 4;
return value;
} else {
throw new EOFException();
}
}
@Override
public String readLine() throws IOException {
if (this.position < this.end) {
// read until a newline is found
StringBuilder bld = new StringBuilder();
char curr = (char) readUnsignedByte();
while (position < this.end && curr != '\n') {
bld.append(curr);
curr = (char) readUnsignedByte();
}
// trim a trailing carriage return
int len = bld.length();
if (len > 0 && bld.charAt(len - 1) == '\r') {
bld.setLength(len - 1);
}
String s = bld.toString();
bld.setLength(0);
return s;
} else {
return null;
}
}
@Override
public long readLong() throws IOException {
if (position >= 0 && position < this.end - 7) {
@SuppressWarnings("restriction")
long value = UNSAFE.getLong(this.memory, BASE_OFFSET + this.position);
if (LITTLE_ENDIAN) {
value = Long.reverseBytes(value);
}
this.position += 8;
return value;
} else {
throw new EOFException();
}
}
@Override
public short readShort() throws IOException {
if (position >= 0 && position < this.end - 1) {
return (short)
((((this.memory[position++]) & 0xff) << 8)
| (((this.memory[position++]) & 0xff) << 0));
} else {
throw new EOFException();
}
}
@Override
public String readUTF() throws IOException {
int utflen = readUnsignedShort();
byte[] bytearr = new byte[utflen];
char[] chararr = new char[utflen];
int c, char2, char3;
int count = 0;
int chararr_count = 0;
readFully(bytearr, 0, utflen);
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
if (c > 127) {
break;
}
count++;
chararr[chararr_count++] = (char) c;
}
while (count < utflen) {
c = (int) bytearr[count] & 0xff;
switch (c >> 4) {
case 0:
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 7:
/* 0xxxxxxx */
count++;
chararr[chararr_count++] = (char) c;
break;
case 12:
case 13:
/* 110x xxxx 10xx xxxx */
count += 2;
if (count > utflen) {
throw new UTFDataFormatException(
"malformed input: partial character at end");
}
char2 = (int) bytearr[count - 1];
if ((char2 & 0xC0) != 0x80) {
throw new UTFDataFormatException(
"malformed input around byte " + count);
}
chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
break;
case 14:
/* 1110 xxxx 10xx xxxx 10xx xxxx */
count += 3;
if (count > utflen) {
throw new UTFDataFormatException(
"malformed input: partial character at end");
}
char2 = (int) bytearr[count - 2];
char3 = (int) bytearr[count - 1];
if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
throw new UTFDataFormatException(
"malformed input around byte " + (count - 1));
}
chararr[chararr_count++] =
(char)
(((c & 0x0F) << 12)
| ((char2 & 0x3F) << 6)
| ((char3 & 0x3F) << 0));
break;
default:
/* 10xx xxxx, 1111 xxxx */
throw new UTFDataFormatException("malformed input around byte " + count);
}
}
// The number of chars produced may be less than utflen
return new String(chararr, 0, chararr_count);
}
@Override
public int readUnsignedByte() throws IOException {
if (this.position < this.end) {
return (this.memory[this.position++] & 0xff);
} else {
throw new EOFException();
}
}
@Override
public int readUnsignedShort() throws IOException {
if (this.position < this.end - 1) {
return ((this.memory[this.position++] & 0xff) << 8)
| ((this.memory[this.position++] & 0xff) << 0);
} else {
throw new EOFException();
}
}
@Override
public int skipBytes(int n) throws IOException {
if (this.position <= this.end - n) {
this.position += n;
return n;
} else {
n = this.end - this.position;
this.position = this.end;
return n;
}
}
@Override
public void skipBytesToRead(int numBytes) throws IOException {
if (this.end - this.position < numBytes) {
throw new EOFException("Could not skip " + numBytes + ".");
}
skipBytes(numBytes);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException("Byte array b cannot be null.");
}
if (off < 0) {
throw new IndexOutOfBoundsException("Offset cannot be negative.");
}
if (len < 0) {
throw new IndexOutOfBoundsException("Length cannot be negative.");
}
if (b.length - off < len) {
throw new IndexOutOfBoundsException(
"Byte array does not provide enough space to store requested data" + ".");
}
if (this.position >= this.end) {
return -1;
} else {
int toRead = Math.min(this.end - this.position, len);
System.arraycopy(this.memory, this.position, b, off, toRead);
this.position += toRead;
return toRead;
}
}
@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
// ----------------------------------------------------------------------------------------
// Data Output
// ----------------------------------------------------------------------------------------
@Override
public void write(int b) throws IOException {
if (this.position >= this.memory.length) {
resize(1);
}
this.memory[this.position++] = (byte) (b & 0xff);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
if (this.position > this.memory.length - len) {
resize(len);
}
System.arraycopy(b, off, this.memory, this.position, len);
this.position += len;
}
@Override
public void writeBoolean(boolean v) throws IOException {
write(v ? 1 : 0);
}
@Override
public void writeByte(int v) throws IOException {
write(v);
}
@Override
public void writeBytes(String s) throws IOException {
final int sLen = s.length();
if (this.position >= this.memory.length - sLen) {
resize(sLen);
}
for (int i = 0; i < sLen; i++) {
writeByte(s.charAt(i));
}
this.position += sLen;
}
@Override
public void writeChar(int v) throws IOException {
if (this.position >= this.memory.length - 1) {
resize(2);
}
this.memory[this.position++] = (byte) (v >> 8);
this.memory[this.position++] = (byte) v;
}
@Override
public void writeChars(String s) throws IOException {
final int sLen = s.length();
if (this.position >= this.memory.length - 2 * sLen) {
resize(2 * sLen);
}
for (int i = 0; i < sLen; i++) {
writeChar(s.charAt(i));
}
}
@Override
public void writeDouble(double v) throws IOException {
writeLong(Double.doubleToLongBits(v));
}
@Override
public void writeFloat(float v) throws IOException {
writeInt(Float.floatToIntBits(v));
}
@SuppressWarnings("restriction")
@Override
public void writeInt(int v) throws IOException {
if (this.position >= this.memory.length - 3) {
resize(4);
}
if (LITTLE_ENDIAN) {
v = Integer.reverseBytes(v);
}
UNSAFE.putInt(this.memory, BASE_OFFSET + this.position, v);
this.position += 4;
}
@SuppressWarnings("restriction")
@Override
public void writeLong(long v) throws IOException {
if (this.position >= this.memory.length - 7) {
resize(8);
}
if (LITTLE_ENDIAN) {
v = Long.reverseBytes(v);
}
UNSAFE.putLong(this.memory, BASE_OFFSET + this.position, v);
this.position += 8;
}
@Override
public void writeShort(int v) throws IOException {
if (this.position >= this.memory.length - 1) {
resize(2);
}
this.memory[this.position++] = (byte) ((v >>> 8) & 0xff);
this.memory[this.position++] = (byte) ((v >>> 0) & 0xff);
}
@Override
public void writeUTF(String str) throws IOException {
int strlen = str.length();
int utflen = 0;
int c;
/* use charAt instead of copying String to char array */
for (int i = 0; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
utflen++;
} else if (c > 0x07FF) {
utflen += 3;
} else {
utflen += 2;
}
}
if (utflen > 65535) {
throw new UTFDataFormatException("Encoded string is too long: " + utflen);
} else if (this.position > this.memory.length - utflen - 2) {
resize(utflen + 2);
}
byte[] bytearr = this.memory;
int count = this.position;
bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF);
bytearr[count++] = (byte) ((utflen >>> 0) & 0xFF);
int i = 0;
for (i = 0; i < strlen; i++) {
c = str.charAt(i);
if (!((c >= 0x0001) && (c <= 0x007F))) {
break;
}
bytearr[count++] = (byte) c;
}
for (; i < strlen; i++) {
c = str.charAt(i);
if ((c >= 0x0001) && (c <= 0x007F)) {
bytearr[count++] = (byte) c;
} else if (c > 0x07FF) {
bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
} else {
bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
bytearr[count++] = (byte) (0x80 | ((c >> 0) & 0x3F));
}
}
this.position = count;
}
private void writeValLenIntBackwards(int value) throws IOException {
if (this.position > this.memory.length - 4) {
resize(4);
}
if (value <= 0x7f) {
this.memory[this.position++] = (byte) value;
} else if (value <= 0x3fff) {
this.memory[this.position++] = (byte) (value >>> 7);
this.memory[this.position++] = (byte) (value | MAX_BIT);
} else if (value <= 0x1fffff) {
this.memory[this.position++] = (byte) (value >>> 14);
this.memory[this.position++] = (byte) ((value >>> 7) | MAX_BIT);
this.memory[this.position++] = (byte) (value | MAX_BIT);
} else if (value <= 0xfffffff) {
this.memory[this.position++] = (byte) (value >>> 21);
this.memory[this.position++] = (byte) ((value >>> 14) | MAX_BIT);
this.memory[this.position++] = (byte) ((value >>> 7) | MAX_BIT);
this.memory[this.position++] = (byte) (value | MAX_BIT);
} else {
this.memory[this.position++] = (byte) (value >>> 28);
this.memory[this.position++] = (byte) ((value >>> 21) | MAX_BIT);
this.memory[this.position++] = (byte) ((value >>> 14) | MAX_BIT);
this.memory[this.position++] = (byte) ((value >>> 7) | MAX_BIT);
this.memory[this.position++] = (byte) (value | MAX_BIT);
}
}
private void resize(int minCapacityAdd) throws IOException {
try {
final int newLen =
Math.max(this.memory.length * 2, this.memory.length + minCapacityAdd);
final byte[] nb = new byte[newLen];
System.arraycopy(this.memory, 0, nb, 0, this.position);
this.memory = nb;
} catch (NegativeArraySizeException nasex) {
throw new IOException(
"Serialization failed because the record length would exceed 2GB.");
}
}
@SuppressWarnings("restriction")
private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
@SuppressWarnings("restriction")
private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
private static final boolean LITTLE_ENDIAN =
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
@Override
public void skipBytesToWrite(int numBytes) throws IOException {
int skippedBytes = skipBytes(numBytes);
if (skippedBytes != numBytes) {
throw new EOFException("Could not skip " + numBytes + " bytes.");
}
}
@Override
public void write(DataInputView source, int numBytes) throws IOException {
if (numBytes > this.end - this.position) {
throw new IOException(
"Could not write " + numBytes + " bytes since the buffer is full.");
}
source.readFully(this.memory, this.position, numBytes);
this.position += numBytes;
}
}
}