| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage; |
| |
| import com.google.protobuf.Message; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.tajo.catalog.Schema; |
| import org.apache.tajo.catalog.TableMeta; |
| import org.apache.tajo.catalog.statistics.TableStats; |
| import org.apache.tajo.common.TajoDataTypes.DataType; |
| import org.apache.tajo.datum.DatumFactory; |
| import org.apache.tajo.datum.NullDatum; |
| import org.apache.tajo.datum.ProtobufDatumFactory; |
| import org.apache.tajo.storage.fragment.FileFragment; |
| import org.apache.tajo.tuple.offheap.OffHeapRowBlock; |
| import org.apache.tajo.tuple.offheap.RowWriter; |
| import org.apache.tajo.unit.StorageUnit; |
| import org.apache.tajo.util.BitArray; |
| import org.apache.tajo.util.UnsafeUtil; |
| |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| |
| import static org.apache.tajo.storage.StorageUtil.*; |
| |
| public class RawFile { |
| public static final String FILE_EXTENSION = "raw"; |
| |
| private static final Log LOG = LogFactory.getLog(RawFile.class); |
| |
| public static class RawFileScanner extends FileScanner implements SeekableScanner { |
| private FileChannel channel; |
| private DataType[] columnTypes; |
| |
| private ByteBuffer buffer; |
| private int bufferSize; |
| private Tuple tuple; |
| |
| private int headerSize = 0; // Header size of a tuple |
| private BitArray nullFlags; |
| private static final int RECORD_SIZE = 4; |
| private boolean eof = false; |
| private long fileLimit; // If this.fragment represents a complete file, this value is equal to the file's size |
| private long numBytesRead; |
| private FileInputStream fis; |
| private long recordCount; |
| |
| public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { |
| super(conf, schema, meta, fragment); |
| } |
| |
| public void init() throws IOException { |
| File file; |
| try { |
| if (fragment.getPath().toUri().getScheme() != null) { |
| file = new File(fragment.getPath().toUri()); |
| } else { |
| file = new File(fragment.getPath().toString()); |
| } |
| } catch (IllegalArgumentException iae) { |
| throw new IOException(iae); |
| } |
| |
| fis = new FileInputStream(file); |
| channel = fis.getChannel(); |
| fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize |
| |
| if (tableStats != null) { |
| tableStats.setNumBytes(fragment.getEndKey()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size() |
| + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit); |
| } |
| |
| if (fragment.getEndKey() < 64 * StorageUnit.KB) { |
| bufferSize = fragment.getEndKey().intValue(); |
| } else { |
| bufferSize = 64 * StorageUnit.KB; |
| } |
| buffer = ByteBuffer.allocateDirect(bufferSize); |
| |
| columnTypes = new DataType[schema.size()]; |
| for (int i = 0; i < schema.size(); i++) { |
| columnTypes[i] = schema.getColumn(i).getDataType(); |
| } |
| |
| tuple = new VTuple(columnTypes.length); |
| nullFlags = new BitArray(schema.size()); |
| headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize |
| |
| // initial read |
| if (fragment.getStartKey() > 0) { |
| channel.position(fragment.getStartKey()); |
| } |
| numBytesRead = channel.read(buffer); |
| buffer.flip(); |
| |
| super.init(); |
| } |
| |
| @Override |
| public long getNextOffset() throws IOException { |
| return channel.position() - buffer.remaining(); |
| } |
| |
| @Override |
| public void seek(long offset) throws IOException { |
| long currentPos = channel.position(); |
| if(currentPos < offset && offset < currentPos + buffer.limit()){ |
| buffer.position((int)(offset - currentPos)); |
| } else { |
| buffer.clear(); |
| channel.position(offset); |
| int bytesRead = channel.read(buffer); |
| numBytesRead = bytesRead; |
| buffer.flip(); |
| eof = false; |
| } |
| } |
| |
| private boolean fillBuffer() throws IOException { |
| if (numBytesRead >= fragment.getEndKey()) { |
| eof = true; |
| return false; |
| } |
| int currentDataSize = buffer.remaining(); |
| buffer.compact(); |
| int bytesRead = channel.read(buffer); |
| if (bytesRead == -1) { |
| eof = true; |
| return false; |
| } else { |
| buffer.flip(); |
| long realRemaining = fragment.getEndKey() - numBytesRead; |
| numBytesRead += bytesRead; |
| if (realRemaining < bufferSize) { |
| int newLimit = currentDataSize + (int) realRemaining; |
| if(newLimit > bufferSize) { |
| newLimit = bufferSize; |
| } |
| buffer.limit(newLimit); |
| } |
| return true; |
| } |
| } |
| |
| @Override |
| public Tuple next() throws IOException { |
| if(eof) return null; |
| |
| if (buffer.remaining() < headerSize) { |
| if (!fillBuffer()) { |
| return null; |
| } |
| } |
| |
| // backup the buffer state |
| int bufferLimit = buffer.limit(); |
| int recordSize = buffer.getInt(); |
| int nullFlagSize = buffer.getShort(); |
| |
| buffer.limit(buffer.position() + nullFlagSize); |
| nullFlags.fromByteBuffer(buffer); |
| // restore the start of record contents |
| buffer.limit(bufferLimit); |
| //buffer.position(recordOffset + headerSize); |
| if (buffer.remaining() < (recordSize - headerSize)) { |
| if (!fillBuffer()) { |
| return null; |
| } |
| } |
| |
| recordCount++; |
| |
| for (int i = 0; i < columnTypes.length; i++) { |
| // check if the i'th column is null |
| if (nullFlags.get(i)) { |
| tuple.put(i, DatumFactory.createNullDatum()); |
| continue; |
| } |
| |
| switch (columnTypes[i].getType()) { |
| case BOOLEAN : |
| tuple.put(i, DatumFactory.createBool(buffer.get())); |
| break; |
| |
| case BIT : |
| tuple.put(i, DatumFactory.createBit(buffer.get())); |
| break; |
| |
| case CHAR : |
| int realLen = readRawVarint32(buffer); |
| byte[] buf = new byte[realLen]; |
| buffer.get(buf); |
| tuple.put(i, DatumFactory.createChar(buf)); |
| break; |
| |
| case INT2 : |
| tuple.put(i, DatumFactory.createInt2(buffer.getShort())); |
| break; |
| |
| case INT4 : |
| tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32(buffer)))); |
| break; |
| |
| case INT8 : |
| tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64(buffer)))); |
| break; |
| |
| case FLOAT4 : |
| tuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); |
| break; |
| |
| case FLOAT8 : |
| tuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); |
| break; |
| |
| case TEXT : { |
| int len = readRawVarint32(buffer); |
| byte [] strBytes = new byte[len]; |
| buffer.get(strBytes); |
| tuple.put(i, DatumFactory.createText(strBytes)); |
| break; |
| } |
| |
| case BLOB : { |
| int len = readRawVarint32(buffer); |
| byte [] rawBytes = new byte[len]; |
| buffer.get(rawBytes); |
| tuple.put(i, DatumFactory.createBlob(rawBytes)); |
| break; |
| } |
| |
| case PROTOBUF: { |
| int len = readRawVarint32(buffer); |
| byte [] rawBytes = new byte[len]; |
| buffer.get(rawBytes); |
| |
| ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]); |
| Message.Builder builder = factory.newBuilder(); |
| builder.mergeFrom(rawBytes); |
| tuple.put(i, factory.createDatum(builder.build())); |
| break; |
| } |
| |
| case INET4 : |
| tuple.put(i, DatumFactory.createInet4(buffer.getInt())); |
| break; |
| |
| case DATE: { |
| int val = buffer.getInt(); |
| if (val < Integer.MIN_VALUE + 1) { |
| tuple.put(i, DatumFactory.createNullDatum()); |
| } else { |
| tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); |
| } |
| break; |
| } |
| case TIME: |
| case TIMESTAMP: { |
| long val = buffer.getLong(); |
| if (val < Long.MIN_VALUE + 1) { |
| tuple.put(i, DatumFactory.createNullDatum()); |
| } else { |
| tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); |
| } |
| break; |
| } |
| case NULL_TYPE: |
| tuple.put(i, NullDatum.get()); |
| break; |
| |
| default: |
| } |
| } |
| |
| if(!buffer.hasRemaining() && channel.position() == fileLimit){ |
| eof = true; |
| } |
| return new VTuple(tuple); |
| } |
| |
| /** |
| * @return has next record block |
| * @throws java.io.IOException |
| */ |
| @Override |
| public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException { |
| rowBlock.clear(); |
| |
| RowWriter writer = rowBlock.getWriter(); |
| while (rowBlock.rows() < rowBlock.maxRowNum()) { |
| |
| if (buffer.remaining() < headerSize) { |
| if (!fillBuffer()) { |
| break; |
| } |
| } |
| |
| // backup the buffer state |
| int bufferLimit = buffer.limit(); |
| int recordSize = buffer.getInt(); |
| int nullFlagSize = buffer.getShort(); |
| |
| buffer.limit(buffer.position() + nullFlagSize); |
| nullFlags.fromByteBuffer(buffer); |
| // restore the start of record contents |
| buffer.limit(bufferLimit); |
| if (buffer.remaining() < (recordSize - headerSize)) { |
| if (!fillBuffer()) { |
| break; |
| } |
| } |
| |
| recordCount++; |
| writer.startRow(); |
| for (int i = 0; i < columnTypes.length; i++) { |
| // check if the i'th column is null |
| if (nullFlags.get(i)) { |
| writer.skipField(); |
| continue; |
| } |
| |
| switch (columnTypes[i].getType()) { |
| case BOOLEAN: |
| writer.putBool(buffer.get()); |
| break; |
| case BIT: |
| writer.putByte(buffer.get()); |
| break; |
| case CHAR: { |
| int realLen = readRawVarint32(buffer); |
| byte[] buf = new byte[realLen]; |
| buffer.get(buf); |
| writer.putText(buf); |
| break; |
| } |
| case INT1: |
| case INT2: |
| writer.putInt2(buffer.getShort()); |
| break; |
| case INT4: |
| writer.putInt4(decodeZigZag32(readRawVarint32(buffer))); |
| break; |
| case INT8: |
| writer.putInt8(decodeZigZag64(readRawVarint64(buffer))); |
| break; |
| case FLOAT4: |
| writer.putFloat4(buffer.getFloat()); |
| break; |
| case FLOAT8: |
| writer.putFloat8(buffer.getDouble()); |
| break; |
| case TEXT: { |
| int len = readRawVarint32(buffer); |
| byte[] strBytes = new byte[len]; |
| buffer.get(strBytes); |
| writer.putText(strBytes); |
| break; |
| } |
| case BLOB: { |
| int len = readRawVarint32(buffer); |
| byte[] rawBytes = new byte[len]; |
| buffer.get(rawBytes); |
| writer.putBlob(rawBytes); |
| break; |
| } |
| case PROTOBUF: { |
| int len = readRawVarint32(buffer); |
| byte[] rawBytes = new byte[len]; |
| buffer.get(rawBytes); |
| writer.putBlob(rawBytes); |
| break; |
| } |
| case INET4: |
| writer.putInet4(buffer.getInt()); |
| break; |
| case DATE: { |
| int val = buffer.getInt(); |
| if (val < Integer.MIN_VALUE + 1) { |
| writer.skipField(); |
| } else { |
| writer.putDate(val); |
| } |
| break; |
| } |
| case TIME: |
| case TIMESTAMP: { |
| long val = buffer.getLong(); |
| if (val < Long.MIN_VALUE + 1) { |
| writer.skipField(); |
| } else { |
| writer.putTimestamp(val); |
| } |
| break; |
| } |
| case NULL_TYPE: |
| writer.skipField(); |
| break; |
| default: |
| throw new IOException("Not supported type: " + columnTypes[i].getType().name()); |
| } |
| } |
| writer.endRow(); |
| } |
| return rowBlock.rows() > 0; |
| } |
| |
| @Override |
| public void reset() throws IOException { |
| // clear the buffer |
| buffer.clear(); |
| // reload initial buffer |
| channel.position(fragment.getStartKey()); |
| numBytesRead = channel.read(buffer); |
| buffer.flip(); |
| eof = false; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (tableStats != null) { |
| tableStats.setReadBytes(fragment.getEndKey()); |
| tableStats.setNumRows(recordCount); |
| } |
| |
| UnsafeUtil.free(buffer); |
| IOUtils.cleanup(LOG, channel, fis); |
| } |
| |
| @Override |
| public boolean isProjectable() { |
| return false; |
| } |
| |
| @Override |
| public boolean isSelectable() { |
| return false; |
| } |
| |
| @Override |
| public boolean isSplittable(){ |
| return false; |
| } |
| |
| @Override |
| public float getProgress() { |
| try { |
| tableStats.setNumRows(recordCount); |
| long filePos = 0; |
| if (channel != null) { |
| filePos = channel.position(); |
| tableStats.setReadBytes(filePos); |
| } |
| |
| if(eof || channel == null) { |
| tableStats.setReadBytes(fragment.getEndKey()); |
| return 1.0f; |
| } |
| |
| if (filePos == 0) { |
| return 0.0f; |
| } else { |
| return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue())); |
| } |
| } catch (IOException e) { |
| LOG.error(e.getMessage(), e); |
| return 0.0f; |
| } |
| } |
| } |
| |
| public static class RawFileAppender extends FileAppender { |
| private FileChannel channel; |
| private RandomAccessFile randomAccessFile; |
| private DataType[] columnTypes; |
| |
| private ByteBuffer buffer; |
| private BitArray nullFlags; |
| private int headerSize = 0; |
| private static final int RECORD_SIZE = 4; |
| private long pos; |
| |
| private TableStatistics stats; |
| |
| public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { |
| super(conf, schema, meta, path); |
| } |
| |
| public void init() throws IOException { |
| File file; |
| try { |
| if (path.toUri().getScheme() != null) { |
| file = new File(path.toUri()); |
| } else { |
| file = new File(path.toString()); |
| } |
| } catch (IllegalArgumentException iae) { |
| throw new IOException(iae); |
| } |
| |
| randomAccessFile = new RandomAccessFile(file, "rw"); |
| channel = randomAccessFile.getChannel(); |
| pos = 0; |
| |
| columnTypes = new DataType[schema.size()]; |
| for (int i = 0; i < schema.size(); i++) { |
| columnTypes[i] = schema.getColumn(i).getDataType(); |
| } |
| |
| buffer = ByteBuffer.allocateDirect(64 * 1024); |
| |
| // comput the number of bytes, representing the null flags |
| |
| nullFlags = new BitArray(schema.size()); |
| headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); |
| |
| if (enabledStats) { |
| this.stats = new TableStatistics(this.schema); |
| } |
| |
| super.init(); |
| } |
| |
| @Override |
| public long getOffset() throws IOException { |
| return pos; |
| } |
| |
| private void flushBuffer() throws IOException { |
| buffer.limit(buffer.position()); |
| buffer.flip(); |
| channel.write(buffer); |
| buffer.clear(); |
| } |
| |
| private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten) |
| throws IOException { |
| |
| // if the buffer reaches the limit, |
| // write the bytes from 0 to the previous record. |
| if (buffer.remaining() < sizeToBeWritten) { |
| |
| int limit = buffer.position(); |
| buffer.limit(recordOffset); |
| buffer.flip(); |
| channel.write(buffer); |
| buffer.position(recordOffset); |
| buffer.limit(limit); |
| buffer.compact(); |
| |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers |
| * into values that can be efficiently encoded with varint. (Otherwise, |
| * negative values must be sign-extended to 64 bits to be varint encoded, |
| * thus always taking 10 bytes on the wire.) |
| * |
| * @param n A signed 32-bit integer. |
| * @return An unsigned 32-bit integer, stored in a signed int because |
| * Java has no explicit unsigned support. |
| */ |
| public static int encodeZigZag32(final int n) { |
| // Note: the right-shift must be arithmetic |
| return (n << 1) ^ (n >> 31); |
| } |
| |
| /** |
| * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers |
| * into values that can be efficiently encoded with varint. (Otherwise, |
| * negative values must be sign-extended to 64 bits to be varint encoded, |
| * thus always taking 10 bytes on the wire.) |
| * |
| * @param n A signed 64-bit integer. |
| * @return An unsigned 64-bit integer, stored in a signed int because |
| * Java has no explicit unsigned support. |
| */ |
| public static long encodeZigZag64(final long n) { |
| // Note: the right-shift must be arithmetic |
| return (n << 1) ^ (n >> 63); |
| } |
| |
| /** |
| * Encode and write a varint. {@code value} is treated as |
| * unsigned, so it won't be sign-extended if negative. |
| */ |
| public void writeRawVarint32(int value) throws IOException { |
| while (true) { |
| if ((value & ~0x7F) == 0) { |
| buffer.put((byte) value); |
| return; |
| } else { |
| buffer.put((byte) ((value & 0x7F) | 0x80)); |
| value >>>= 7; |
| } |
| } |
| } |
| |
| /** |
| * Compute the number of bytes that would be needed to encode a varint. |
| * {@code value} is treated as unsigned, so it won't be sign-extended if |
| * negative. |
| */ |
| public static int computeRawVarint32Size(final int value) { |
| if ((value & (0xffffffff << 7)) == 0) return 1; |
| if ((value & (0xffffffff << 14)) == 0) return 2; |
| if ((value & (0xffffffff << 21)) == 0) return 3; |
| if ((value & (0xffffffff << 28)) == 0) return 4; |
| return 5; |
| } |
| |
| /** Encode and write a varint. */ |
| public void writeRawVarint64(long value) throws IOException { |
| while (true) { |
| if ((value & ~0x7FL) == 0) { |
| buffer.put((byte) value); |
| return; |
| } else { |
| buffer.put((byte) ((value & 0x7F) | 0x80)); |
| value >>>= 7; |
| } |
| } |
| } |
| |
| @Override |
| public void addTuple(Tuple t) throws IOException { |
| |
| if (buffer.remaining() < headerSize) { |
| flushBuffer(); |
| } |
| |
| // skip the row header |
| int recordOffset = buffer.position(); |
| buffer.position(recordOffset + headerSize); |
| // reset the null flags |
| nullFlags.clear(); |
| for (int i = 0; i < schema.size(); i++) { |
| if (enabledStats) { |
| stats.analyzeField(i, t.get(i)); |
| } |
| |
| if (t.isNull(i)) { |
| nullFlags.set(i); |
| continue; |
| } |
| |
| // 8 is the maximum bytes size of all types |
| if (flushBufferAndReplace(recordOffset, 8)) { |
| recordOffset = 0; |
| } |
| |
| switch(columnTypes[i].getType()) { |
| case NULL_TYPE: |
| nullFlags.set(i); |
| continue; |
| |
| case BOOLEAN: |
| case BIT: |
| buffer.put(t.getByte(i)); |
| break; |
| |
| case INT2 : |
| buffer.putShort(t.getInt2(i)); |
| break; |
| |
| case INT4 : |
| writeRawVarint32(encodeZigZag32(t.getInt4(i))); |
| break; |
| |
| case INT8 : |
| writeRawVarint64(encodeZigZag64(t.getInt8(i))); |
| break; |
| |
| case FLOAT4 : |
| buffer.putFloat(t.getFloat4(i)); |
| break; |
| |
| case FLOAT8 : |
| buffer.putDouble(t.getFloat8(i)); |
| break; |
| |
| case CHAR: |
| case TEXT: { |
| byte [] strBytes = t.getBytes(i); |
| if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) { |
| recordOffset = 0; |
| } |
| writeRawVarint32(strBytes.length); |
| buffer.put(strBytes); |
| break; |
| } |
| |
| case DATE: |
| buffer.putInt(t.getInt4(i)); |
| break; |
| |
| case TIME: |
| case TIMESTAMP: |
| buffer.putLong(t.getInt8(i)); |
| break; |
| |
| case BLOB : { |
| byte [] rawBytes = t.getBytes(i); |
| if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { |
| recordOffset = 0; |
| } |
| writeRawVarint32(rawBytes.length); |
| buffer.put(rawBytes); |
| break; |
| } |
| |
| case PROTOBUF: { |
| byte [] rawBytes = t.getBytes(i); |
| if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { |
| recordOffset = 0; |
| } |
| writeRawVarint32(rawBytes.length); |
| buffer.put(rawBytes); |
| break; |
| } |
| |
| case INET4 : |
| buffer.putInt(t.getInt4(i)); |
| break; |
| |
| default: |
| throw new IOException("Cannot support data type: " + columnTypes[i].getType()); |
| } |
| } |
| |
| // write a record header |
| int bufferPos = buffer.position(); |
| buffer.position(recordOffset); |
| buffer.putInt(bufferPos - recordOffset); |
| byte [] flags = nullFlags.toArray(); |
| buffer.putShort((short) flags.length); |
| buffer.put(flags); |
| |
| pos += bufferPos - recordOffset; |
| buffer.position(bufferPos); |
| |
| if (enabledStats) { |
| stats.incrementRow(); |
| } |
| } |
| |
| @Override |
| public void flush() throws IOException { |
| if(buffer != null){ |
| flushBuffer(); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| flush(); |
| if (enabledStats) { |
| stats.setNumBytes(getOffset()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); |
| } |
| |
| UnsafeUtil.free(buffer); |
| IOUtils.cleanup(LOG, channel, randomAccessFile); |
| } |
| |
| @Override |
| public TableStats getStats() { |
| if (enabledStats) { |
| stats.setNumBytes(pos); |
| return stats.getTableStat(); |
| } else { |
| return null; |
| } |
| } |
| } |
| } |