| /* |
| * Copyright 2009-2013 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edu.uci.ics.pregelix.dataflow.std.util; |
| |
| import java.io.DataInputStream; |
| import java.io.IOException; |
| |
| import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor; |
| import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; |
| import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; |
| |
| public class TupleDeserializer { |
| private static String ERROR_MSG = "Out-of-bound read in your Writable implementations of types for vertex id, vertex value, edge value or message --- check your readFields and write implmenetation"; |
| private final Object[] record; |
| private final RecordDescriptor recordDescriptor; |
| private final ResetableByteArrayInputStream bbis; |
| private final DataInputStream di; |
| |
| public TupleDeserializer(RecordDescriptor recordDescriptor) { |
| this.recordDescriptor = recordDescriptor; |
| this.bbis = new ResetableByteArrayInputStream(); |
| this.di = new DataInputStream(bbis); |
| this.record = new Object[recordDescriptor.getFields().length]; |
| } |
| |
| public Object[] deserializeRecord(ITupleReference tupleRef) throws HyracksDataException { |
| try { |
| for (int i = 0; i < tupleRef.getFieldCount(); ++i) { |
| byte[] data = tupleRef.getFieldData(i); |
| int offset = tupleRef.getFieldStart(i); |
| int len = tupleRef.getFieldLength(i); |
| bbis.setByteArray(data, offset); |
| |
| int availableBefore = bbis.available(); |
| Object instance = recordDescriptor.getFields()[i].deserialize(di); |
| int availableAfter = bbis.available(); |
| if (availableBefore - availableAfter > len) { |
| throw new IllegalStateException(ERROR_MSG); |
| } |
| |
| record[i] = instance; |
| } |
| return record; |
| } catch (IOException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| |
| public Object[] deserializeRecord(IFrameTupleAccessor left, int tIndex, ITupleReference right) |
| throws HyracksDataException { |
| try { |
| /** skip vertex id field in deserialization */ |
| byte[] data = left.getBuffer().array(); |
| int tStart = left.getTupleStartOffset(tIndex) + left.getFieldSlotsLength(); |
| int leftFieldCount = left.getFieldCount(); |
| int fStart = tStart; |
| for (int i = 1; i < leftFieldCount; ++i) { |
| /** |
| * reset the input |
| */ |
| fStart = tStart + left.getFieldStartOffset(tIndex, i); |
| int fieldLength = left.getFieldLength(tIndex, i); |
| bbis.setByteArray(data, fStart); |
| |
| /** |
| * do deserialization |
| */ |
| int availableBefore = bbis.available(); |
| Object instance = recordDescriptor.getFields()[i].deserialize(di); |
| int availableAfter = bbis.available(); |
| if (availableBefore - availableAfter > fieldLength) { |
| throw new IllegalStateException(ERROR_MSG); |
| |
| } |
| record[i] = instance; |
| } |
| /** skip vertex id field in deserialization */ |
| for (int i = leftFieldCount + 1; i < record.length; ++i) { |
| byte[] rightData = right.getFieldData(i - leftFieldCount); |
| int rightOffset = right.getFieldStart(i - leftFieldCount); |
| int len = right.getFieldLength(i - leftFieldCount); |
| bbis.setByteArray(rightData, rightOffset); |
| |
| int availableBefore = bbis.available(); |
| Object instance = recordDescriptor.getFields()[i].deserialize(di); |
| int availableAfter = bbis.available(); |
| if (availableBefore - availableAfter > len) { |
| throw new IllegalStateException(ERROR_MSG); |
| } |
| record[i] = instance; |
| } |
| return record; |
| } catch (IOException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| |
| public Object[] deserializeRecord(ArrayTupleBuilder tb, ITupleReference right, boolean nullLeft) |
| throws HyracksDataException { |
| try { |
| if (nullLeft) { |
| byte[] rightData = right.getFieldData(1); |
| int rightFieldOffset = right.getFieldStart(1); |
| /** skip a halted and no message vertex without deserializing it */ |
| if ((rightData[rightFieldOffset] & 1) == 1) { |
| // halt flag is the last bit of the first byte of any vertex value |
| record[0] = null; |
| record[1] = null; |
| return record; |
| } |
| } |
| |
| byte[] data = tb.getByteArray(); |
| int[] offset = tb.getFieldEndOffsets(); |
| int start = 0; |
| /** skip vertex id fields in deserialization */ |
| for (int i = 1; i < offset.length; ++i) { |
| /** |
| * reset the input |
| */ |
| start = offset[i - 1]; |
| bbis.setByteArray(data, start); |
| int fieldLength = i == 0 ? offset[0] : offset[i] - offset[i - 1]; |
| |
| /** |
| * do deserialization |
| */ |
| int availableBefore = bbis.available(); |
| Object instance = recordDescriptor.getFields()[i].deserialize(di); |
| int availableAfter = bbis.available(); |
| if (availableBefore - availableAfter > fieldLength) { |
| throw new IllegalStateException(ERROR_MSG); |
| } |
| record[i] = instance; |
| } |
| /** skip vertex id fields in deserialization */ |
| for (int i = offset.length + 1; i < record.length; ++i) { |
| byte[] rightData = right.getFieldData(i - offset.length); |
| int rightOffset = right.getFieldStart(i - offset.length); |
| bbis.setByteArray(rightData, rightOffset); |
| int fieldLength = right.getFieldLength(i - offset.length); |
| |
| int availableBefore = bbis.available(); |
| Object instance = recordDescriptor.getFields()[i].deserialize(di); |
| int availableAfter = bbis.available(); |
| if (availableBefore - availableAfter > fieldLength) { |
| throw new IllegalStateException(ERROR_MSG); |
| } |
| record[i] = instance; |
| } |
| return record; |
| } catch (IOException e) { |
| throw new HyracksDataException(e); |
| } |
| } |
| } |