blob: 67d2eb86280d8344ca3611ca97a1899d9e39a1e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.indexstore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.compression.SnappyCompressor;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.Writable;
import org.apache.carbondata.core.stream.ExtendedByteArrayInputStream;
import org.apache.carbondata.core.stream.ExtendedByteArrayOutputStream;
import org.apache.carbondata.core.stream.ExtendedDataInputStream;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.log4j.Logger;
/**
* class will be used to send extended blocklet object from index executor to index driver
* if data size is more than it will be written in temp folder provided by user
* and only file name will send, if data size is less then complete data will be send to
* index executor
*/
public class ExtendedBlockletWrapper implements Writable, Serializable {
private static final Logger LOGGER =
LogServiceFactory.getLogService(ExtendedBlockletWrapper.class.getName());
private static final int BUFFER_SIZE = 8 * 1024 * 1024;
private static final int BLOCK_SIZE = 256 * 1024 * 1024;
private boolean isWrittenToFile;
private int dataSize;
private byte[] bytes;
public ExtendedBlockletWrapper() {
}
public ExtendedBlockletWrapper(List<ExtendedBlocklet> extendedBlockletList, String tablePath,
String queryId, boolean isWriteToFile, boolean isCountJob) {
Map<String, Short> uniqueLocations = new HashMap<>();
byte[] bytes = convertToBytes(tablePath, uniqueLocations, extendedBlockletList, isCountJob);
int serializeAllowedSize = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD,
CarbonCommonConstants.CARBON_INDEX_SERVER_SERIALIZATION_THRESHOLD_DEFAULT)) * 1024;
DataOutputStream stream = null;
// if data size is more then data will be written in file and file name will be sent from
// executor to driver, in case of any failure data will send through network
if (bytes.length > serializeAllowedSize && isWriteToFile) {
final String fileName = UUID.randomUUID().toString();
String folderPath = CarbonUtil.getIndexServerTempPath()
+ CarbonCommonConstants.FILE_SEPARATOR + queryId;
try {
final CarbonFile carbonFile = FileFactory.getCarbonFile(folderPath);
boolean isFolderExists = true;
if (!carbonFile.isFileExist()) {
LOGGER.warn("Folder:" + folderPath + "doesn't exists, data will be send through network");
isFolderExists = false;
}
if (isFolderExists) {
stream = FileFactory.getDataOutputStream(folderPath + "/" + fileName,
BUFFER_SIZE, BLOCK_SIZE, (short) 1);
writeBlockletToStream(stream, bytes, uniqueLocations, extendedBlockletList);
this.dataSize = stream.size();
this.bytes = fileName.getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
isWrittenToFile = true;
}
} catch (IOException e) {
LOGGER.error("Problem while writing to file, data will be sent through network", e);
} finally {
CarbonUtil.closeStreams(stream);
}
}
if (!isWrittenToFile) {
try {
ExtendedByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
stream = new DataOutputStream(bos);
writeBlockletToStream(stream, bytes, uniqueLocations, extendedBlockletList);
this.dataSize = bos.size();
this.bytes = bos.getBuffer();
} catch (IOException e) {
LOGGER.error("Problem while writing data to memory stream", e);
} finally {
CarbonUtil.closeStreams(stream);
}
}
}
private byte[] convertToBytes(String tablePath, Map<String, Short> uniqueLocations,
List<ExtendedBlocklet> extendedBlockletList, boolean isCountJob) {
ByteArrayOutputStream bos = new ExtendedByteArrayOutputStream();
DataOutputStream stream = new DataOutputStream(bos);
try {
for (ExtendedBlocklet extendedBlocklet : extendedBlockletList) {
extendedBlocklet.setFilePath(extendedBlocklet.getFilePath().replace(tablePath, ""));
extendedBlocklet.serializeData(stream, uniqueLocations, isCountJob);
}
byte[] input = bos.toByteArray();
return new SnappyCompressor().compressByte(input, input.length);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
CarbonUtil.closeStreams(stream);
}
}
/**
* Below method will be used to write the data to stream[file/memory]
* Data Format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param stream
* @param data
* @param uniqueLocation
* @param extendedBlockletList
* @throws IOException
*/
private void writeBlockletToStream(DataOutputStream stream, byte[] data,
Map<String, Short> uniqueLocation, List<ExtendedBlocklet> extendedBlockletList)
throws IOException {
stream.writeInt(extendedBlockletList.size());
String[] uniqueLoc = new String[uniqueLocation.size()];
Iterator<Map.Entry<String, Short>> iterator = uniqueLocation.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, Short> next = iterator.next();
uniqueLoc[next.getValue()] = next.getKey();
}
stream.writeShort((short) uniqueLoc.length);
for (String loc : uniqueLoc) {
stream.writeUTF(loc);
}
stream.writeInt(data.length);
stream.write(data);
}
/**
* deserialize the blocklet data from file or stream
* data format
* <number of splits><number of unique location[short]><locations><serialize data len><data>
*
* @param tablePath
* @param queryId
* @return
* @throws IOException
*/
public List<ExtendedBlocklet> readBlocklet(String tablePath, String queryId, boolean isCountJob)
throws IOException {
byte[] data;
if (bytes != null) {
if (isWrittenToFile) {
DataInputStream stream = null;
try {
final String folderPath = CarbonUtil.getIndexServerTempPath()
+ CarbonCommonConstants.FILE_SEPARATOR + queryId;
String fileName = new String(bytes, CarbonCommonConstants.DEFAULT_CHARSET);
stream = FileFactory
.getDataInputStream(folderPath + "/" + fileName);
data = new byte[dataSize];
stream.readFully(data);
} finally {
CarbonUtil.closeStreams(stream);
}
} else {
data = bytes;
}
DataInputStream stream = null;
int numberOfBlocklet;
String[] locations;
int actualDataLen;
try {
stream = new DataInputStream(new ByteArrayInputStream(data));
numberOfBlocklet = stream.readInt();
short numberOfLocations = stream.readShort();
locations = new String[numberOfLocations];
for (int i = 0; i < numberOfLocations; i++) {
locations[i] = stream.readUTF();
}
actualDataLen = stream.readInt();
} finally {
CarbonUtil.closeStreams(stream);
}
final byte[] unCompressByte =
new SnappyCompressor().unCompressByte(data, this.dataSize - actualDataLen, actualDataLen);
ExtendedByteArrayInputStream ebis = new ExtendedByteArrayInputStream(unCompressByte);
ExtendedDataInputStream eDIS = new ExtendedDataInputStream(ebis);
List<ExtendedBlocklet> extendedBlockletList = new ArrayList<>();
try {
for (int i = 0; i < numberOfBlocklet; i++) {
ExtendedBlocklet extendedBlocklet = new ExtendedBlocklet();
extendedBlocklet.deserializeFields(eDIS, locations, tablePath, isCountJob);
extendedBlockletList.add(extendedBlocklet);
}
} finally {
CarbonUtil.closeStreams(eDIS);
}
return extendedBlockletList;
} else {
return new ArrayList<>();
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(isWrittenToFile);
out.writeBoolean(bytes != null);
if (bytes != null) {
out.writeInt(bytes.length);
out.write(bytes);
}
out.writeInt(dataSize);
}
@Override
public void readFields(DataInput in) throws IOException {
this.isWrittenToFile = in.readBoolean();
if (in.readBoolean()) {
this.bytes = new byte[in.readInt()];
in.readFully(bytes);
}
this.dataSize = in.readInt();
}
}