blob: 0391525aa10801480e5a9623c284fc636b7a6b62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.streaming;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.carbondata.core.datastore.compression.Compressor;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonMetadataUtil;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.format.BlockletHeader;
import org.apache.carbondata.format.BlockletIndex;
import org.apache.carbondata.format.BlockletInfo;
import org.apache.carbondata.format.MutationType;
import org.apache.carbondata.streaming.segment.StreamSegment;
/**
* stream blocklet writer
*/
public class StreamBlockletWriter {
private byte[] buffer;
private int maxSize;
private int maxRowNum;
private final int rowSize;
private int count = 0;
private int rowIndex = -1;
private final Compressor compressor;
private final int dimCountWithoutComplex;
private final int measureCount;
private final DataType[] measureDataTypes;
// blocklet level stats
ColumnPageStatsCollector[] dimStatsCollectors;
ColumnPageStatsCollector[] msrStatsCollectors;
// blocklet level Min/Max
private BlockletMinMaxIndex blockletMinMaxIndex;
StreamBlockletWriter(int maxSize, int maxRowNum, int rowSize, int dimCountWithoutComplex,
int measureCount, DataType[] measureDataTypes, String compressorName) {
buffer = new byte[maxSize];
this.maxSize = maxSize;
this.maxRowNum = maxRowNum;
this.rowSize = rowSize;
this.dimCountWithoutComplex = dimCountWithoutComplex;
this.measureCount = measureCount;
this.measureDataTypes = measureDataTypes;
this.compressor = CompressorFactory.getInstance().getCompressor(compressorName);
initializeStatsCollector();
}
private void initializeStatsCollector() {
// dimension stats collectors
// not require to collector stats for complex type
// so it only contains dictionary dimensions and no-dictionary dimensions
dimStatsCollectors = new ColumnPageStatsCollector[dimCountWithoutComplex];
// measure stats collectors
msrStatsCollectors = new ColumnPageStatsCollector[measureCount];
int dimCount = 0;
for (; dimCount < dimCountWithoutComplex; dimCount++) {
dimStatsCollectors[dimCount] =
KeyPageStatsCollector.newInstance(DataTypes.BYTE_ARRAY);
}
for (int msrCount = 0; msrCount < measureCount; msrCount++) {
msrStatsCollectors[msrCount] =
PrimitivePageStatsCollector.newInstance(measureDataTypes[msrCount]);
}
}
private void ensureCapacity(int space) {
int newCount = space + count;
if (newCount > buffer.length) {
byte[] newBuffer = new byte[Math.max(newCount, buffer.length + rowSize)];
System.arraycopy(buffer, 0, newBuffer, 0, count);
buffer = newBuffer;
}
}
void reset() {
count = 0;
rowIndex = -1;
initializeStatsCollector();
blockletMinMaxIndex = null;
}
byte[] getBytes() {
return buffer;
}
int getCount() {
return count;
}
int getRowIndex() {
return rowIndex;
}
void nextRow() {
rowIndex++;
}
void skipRow() {
maxSize--;
maxRowNum--;
}
boolean isFull() {
return rowIndex == maxRowNum || count >= maxSize;
}
void writeBoolean(boolean val) {
ensureCapacity(1);
buffer[count] = (byte) (val ? 1 : 0);
count += 1;
}
void writeShort(int val) {
ensureCapacity(2);
buffer[count + 1] = (byte) (val);
buffer[count] = (byte) (val >>> 8);
count += 2;
}
void writeInt(int val) {
ensureCapacity(4);
buffer[count + 3] = (byte) (val);
buffer[count + 2] = (byte) (val >>> 8);
buffer[count + 1] = (byte) (val >>> 16);
buffer[count] = (byte) (val >>> 24);
count += 4;
}
void writeLong(long val) {
ensureCapacity(8);
buffer[count + 7] = (byte) (val);
buffer[count + 6] = (byte) (val >>> 8);
buffer[count + 5] = (byte) (val >>> 16);
buffer[count + 4] = (byte) (val >>> 24);
buffer[count + 3] = (byte) (val >>> 32);
buffer[count + 2] = (byte) (val >>> 40);
buffer[count + 1] = (byte) (val >>> 48);
buffer[count] = (byte) (val >>> 56);
count += 8;
}
void writeDouble(double val) {
writeLong(Double.doubleToLongBits(val));
}
void writeBytes(byte[] b) {
writeBytes(b, 0, b.length);
}
void writeBytes(byte[] b, int off, int len) {
ensureCapacity(len);
System.arraycopy(b, off, buffer, count, len);
count += len;
}
private SimpleStatsResult[] getDimStats() {
if (dimStatsCollectors == null) {
return new SimpleStatsResult[0];
}
SimpleStatsResult[] stats = new SimpleStatsResult[dimStatsCollectors.length];
int dimCount = 0;
for (; dimCount < dimStatsCollectors.length; dimCount++) {
stats[dimCount] = dimStatsCollectors[dimCount].getPageStats();
}
return stats;
}
private SimpleStatsResult[] getMsrStats() {
if (msrStatsCollectors == null) {
return new SimpleStatsResult[0];
}
SimpleStatsResult[] stats = new SimpleStatsResult[msrStatsCollectors.length];
for (int mrsCount = 0; mrsCount < msrStatsCollectors.length; mrsCount++) {
stats[mrsCount] = msrStatsCollectors[mrsCount].getPageStats();
}
return stats;
}
BlockletMinMaxIndex generateBlockletMinMax() {
if (blockletMinMaxIndex == null) {
blockletMinMaxIndex = StreamSegment.collectMinMaxIndex(getDimStats(), getMsrStats());
}
return blockletMinMaxIndex;
}
void appendBlocklet(DataOutputStream outputStream) throws IOException {
outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
BlockletInfo blockletInfo = new BlockletInfo();
blockletInfo.setNum_rows(getRowIndex() + 1);
BlockletHeader blockletHeader = new BlockletHeader();
blockletHeader.setBlocklet_length(getCount());
blockletHeader.setMutation(MutationType.INSERT);
blockletHeader.setBlocklet_info(blockletInfo);
// add blocklet level min/max
blockletMinMaxIndex = generateBlockletMinMax();
if (blockletInfo.getNum_rows() > 1) {
BlockletIndex blockletIndex = new BlockletIndex();
blockletIndex.setMin_max_index(CarbonMetadataUtil.convertMinMaxIndex(blockletMinMaxIndex));
blockletHeader.setBlocklet_index(blockletIndex);
}
byte[] headerBytes = CarbonUtil.getByteArray(blockletHeader);
outputStream.writeInt(headerBytes.length);
outputStream.write(headerBytes);
byte[] compressed = compressor.compressByte(getBytes(), getCount());
outputStream.writeInt(compressed.length);
outputStream.write(compressed);
}
void close() {
}
}