| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.tajo.catalog.Column; |
| import org.apache.tajo.catalog.Schema; |
| import org.apache.tajo.catalog.TableMeta; |
| import org.apache.tajo.catalog.proto.CatalogProtos; |
| import org.apache.tajo.util.FileUtil; |
| import org.apache.tajo.util.KeyValueSet; |
| import parquet.hadoop.ParquetOutputFormat; |
| |
| import java.io.DataInput; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| public class StorageUtil extends StorageConstants { |
| public static int getRowByteSize(Schema schema) { |
| int sum = 0; |
| for(Column col : schema.getColumns()) { |
| sum += StorageUtil.getColByteSize(col); |
| } |
| |
| return sum; |
| } |
| |
| public static int getColByteSize(Column col) { |
| switch (col.getDataType().getType()) { |
| case BOOLEAN: |
| return 1; |
| case CHAR: |
| return 1; |
| case BIT: |
| return 1; |
| case INT2: |
| return 2; |
| case INT4: |
| return 4; |
| case INT8: |
| return 8; |
| case FLOAT4: |
| return 4; |
| case FLOAT8: |
| return 8; |
| case INET4: |
| return 4; |
| case INET6: |
| return 32; |
| case TEXT: |
| return 256; |
| case BLOB: |
| return 256; |
| case DATE: |
| return 4; |
| case TIME: |
| return 8; |
| case TIMESTAMP: |
| return 8; |
| default: |
| return 0; |
| } |
| } |
| |
| public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { |
| FileSystem fs = tableroot.getFileSystem(conf); |
| FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); |
| FileUtil.writeProto(out, meta.getProto()); |
| out.flush(); |
| out.close(); |
| } |
| |
| public static Path concatPath(String parent, String...childs) { |
| return concatPath(new Path(parent), childs); |
| } |
| |
| public static Path concatPath(Path parent, String...childs) { |
| StringBuilder sb = new StringBuilder(); |
| |
| for(int i=0; i < childs.length; i++) { |
| sb.append(childs[i]); |
| if(i < childs.length - 1) |
| sb.append("/"); |
| } |
| |
| return new Path(parent, sb.toString()); |
| } |
| |
| public static KeyValueSet newPhysicalProperties(CatalogProtos.StoreType type) { |
| KeyValueSet options = new KeyValueSet(); |
| if (CatalogProtos.StoreType.CSV == type) { |
| options.set(CSVFILE_DELIMITER, DEFAULT_FIELD_DELIMITER); |
| } else if (CatalogProtos.StoreType.RCFILE == type) { |
| options.set(RCFILE_SERDE, DEFAULT_BINARY_SERDE); |
| } else if (CatalogProtos.StoreType.SEQUENCEFILE == type) { |
| options.set(SEQUENCEFILE_SERDE, DEFAULT_TEXT_SERDE); |
| options.set(SEQUENCEFILE_DELIMITER, DEFAULT_FIELD_DELIMITER); |
| } else if (type == CatalogProtos.StoreType.PARQUET) { |
| options.set(ParquetOutputFormat.BLOCK_SIZE, PARQUET_DEFAULT_BLOCK_SIZE); |
| options.set(ParquetOutputFormat.PAGE_SIZE, PARQUET_DEFAULT_PAGE_SIZE); |
| options.set(ParquetOutputFormat.COMPRESSION, PARQUET_DEFAULT_COMPRESSION_CODEC_NAME); |
| options.set(ParquetOutputFormat.ENABLE_DICTIONARY, PARQUET_DEFAULT_IS_DICTIONARY_ENABLED); |
| options.set(ParquetOutputFormat.VALIDATION, PARQUET_DEFAULT_IS_VALIDATION_ENABLED); |
| } |
| |
| return options; |
| } |
| |
| static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; |
| static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; |
| |
| /** |
| * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". |
| * |
| * This method finds the maximum sequence number from existing data files through the above patterns. |
| * If it cannot find any matched file or the maximum number, it will return -1. |
| * |
| * @param fs |
| * @param path |
| * @param recursive |
| * @return The maximum sequence number |
| * @throws IOException |
| */ |
| public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { |
| if (!fs.isDirectory(path)) { |
| return -1; |
| } |
| |
| FileStatus[] files = fs.listStatus(path); |
| |
| if (files == null || files.length == 0) { |
| return -1; |
| } |
| |
| int maxValue = -1; |
| List<Path> fileNamePatternMatchedList = new ArrayList<Path>(); |
| |
| for (FileStatus eachFile: files) { |
| // In the case of partition table, return largest value within all partition dirs. |
| if (eachFile.isDirectory() && recursive) { |
| int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); |
| if (value > maxValue) { |
| maxValue = value; |
| } |
| } else { |
| if (eachFile.getPath().getName().matches(fileNamePatternV08) || |
| eachFile.getPath().getName().matches(fileNamePatternV09)) { |
| fileNamePatternMatchedList.add(eachFile.getPath()); |
| } |
| } |
| } |
| |
| if (fileNamePatternMatchedList.isEmpty()) { |
| return maxValue; |
| } |
| Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); |
| String pathName = lastFile.getName(); |
| |
| // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> |
| // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> |
| String[] pathTokens = pathName.split("-"); |
| if (pathTokens.length == 3) { |
| return -1; |
| } else if(pathTokens.length == 4) { |
| return Integer.parseInt(pathTokens[3]); |
| } else { |
| return -1; |
| } |
| } |
| |
| public static int readFully(InputStream is, byte[] buffer, int offset, int length) |
| throws IOException { |
| int nread = 0; |
| while (nread < length) { |
| int nbytes = is.read(buffer, offset + nread, length - nread); |
| if (nbytes < 0) { |
| return nread > 0 ? nread : nbytes; |
| } |
| nread += nbytes; |
| } |
| return nread; |
| } |
| |
| /** |
| * Similar to readFully(). Skips bytes in a loop. |
| * @param in The DataInput to skip bytes from |
| * @param len number of bytes to skip. |
| * @throws java.io.IOException if it could not skip requested number of bytes |
| * for any reason (including EOF) |
| */ |
| public static void skipFully(DataInput in, int len) throws IOException { |
| int amt = len; |
| while (amt > 0) { |
| long ret = in.skipBytes(amt); |
| if (ret == 0) { |
| // skip may return 0 even if we're not at EOF. Luckily, we can |
| // use the read() method to figure out if we're at the end. |
| int b = in.readByte(); |
| if (b == -1) { |
| throw new EOFException( "Premature EOF from inputStream after " + |
| "skipping " + (len - amt) + " byte(s)."); |
| } |
| ret = 1; |
| } |
| amt -= ret; |
| } |
| } |
| |
| /** |
| * Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers |
| * into values that can be efficiently encoded with varint. (Otherwise, |
| * negative values must be sign-extended to 64 bits to be varint encoded, |
| * thus always taking 10 bytes on the wire.) |
| * |
| * @param n An unsigned 32-bit integer, stored in a signed int because |
| * Java has no explicit unsigned support. |
| * @return A signed 32-bit integer. |
| */ |
| public static int decodeZigZag32(final int n) { |
| return (n >>> 1) ^ -(n & 1); |
| } |
| |
| /** |
| * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers |
| * into values that can be efficiently encoded with varint. (Otherwise, |
| * negative values must be sign-extended to 64 bits to be varint encoded, |
| * thus always taking 10 bytes on the wire.) |
| * |
| * @param n An unsigned 64-bit integer, stored in a signed int because |
| * Java has no explicit unsigned support. |
| * @return A signed 64-bit integer. |
| */ |
| public static long decodeZigZag64(final long n) { |
| return (n >>> 1) ^ -(n & 1); |
| } |
| |
| |
| /** |
| * Read a raw Varint from the stream. If larger than 32 bits, discard the |
| * upper bits. |
| */ |
| public static int readRawVarint32(ByteBuffer buffer) throws IOException { |
| byte tmp = buffer.get(); |
| if (tmp >= 0) { |
| return tmp; |
| } |
| int result = tmp & 0x7f; |
| if ((tmp = buffer.get()) >= 0) { |
| result |= tmp << 7; |
| } else { |
| result |= (tmp & 0x7f) << 7; |
| if ((tmp = buffer.get()) >= 0) { |
| result |= tmp << 14; |
| } else { |
| result |= (tmp & 0x7f) << 14; |
| if ((tmp = buffer.get()) >= 0) { |
| result |= tmp << 21; |
| } else { |
| result |= (tmp & 0x7f) << 21; |
| result |= (tmp = buffer.get()) << 28; |
| if (tmp < 0) { |
| // Discard upper 32 bits. |
| for (int i = 0; i < 5; i++) { |
| if (buffer.get() >= 0) { |
| return result; |
| } |
| } |
| throw new IOException("Invalid Variable int32"); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** Read a raw Varint from the stream. */ |
| public static long readRawVarint64(ByteBuffer buffer) throws IOException { |
| int shift = 0; |
| long result = 0; |
| while (shift < 64) { |
| final byte b = buffer.get(); |
| result |= (long)(b & 0x7F) << shift; |
| if ((b & 0x80) == 0) { |
| return result; |
| } |
| shift += 7; |
| } |
| throw new IOException("Invalid Variable int64"); |
| } |
| } |