blob: e8b8b450c0e05742bf006eaf9a9e2c5a3fe59cdb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.master.exec;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos.CodecType;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.physical.PartitionMergeScanExec;
import org.apache.tajo.engine.planner.physical.ScanExec;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.io.AsyncTaskService;
import org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TablespaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.tuple.memory.MemoryBlock;
import org.apache.tajo.tuple.memory.MemoryRowBlock;
import org.apache.tajo.util.CompressionUtil;
import org.apache.tajo.util.SplitUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner {
private final static Log LOG = LogFactory.getLog(NonForwardQueryResultFileScanner.class);
final private AsyncTaskService asyncTaskService;
final private QueryId queryId;
final private String sessionId;
private ScanExec scanExec;
final private TableDesc tableDesc;
final private RowStoreEncoder rowEncoder;
final private int maxRow;
private boolean eof;
private volatile long totalRows;
private volatile int currentNumRows;
private volatile boolean isStopped;
private TaskAttemptContext taskContext;
final private TajoConf tajoConf;
final private ScanNode scanNode;
final private Optional<CodecType> codecType;
private MemoryRowBlock rowBlock;
private Future<MemoryRowBlock> nextFetch;
public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService,
TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode,
int maxRow, Optional<CodecType> codecType) throws IOException {
this.asyncTaskService = asyncTaskService;
this.tajoConf = tajoConf;
this.sessionId = sessionId;
this.queryId = queryId;
this.scanNode = scanNode;
this.tableDesc = scanNode.getTableDesc();
this.maxRow = maxRow;
this.rowEncoder = RowStoreUtil.createEncoder(scanNode.getOutSchema());
this.codecType = codecType;
}
public void init() throws IOException, TajoException {
initSeqScanExec();
}
private void initSeqScanExec() throws IOException, TajoException {
Tablespace tablespace = TablespaceManager.get(tableDesc.getUri());
List<Fragment> fragments = Lists.newArrayList(
SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
if (!fragments.isEmpty()) {
FragmentProto[] fragmentProtos =
FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
this.taskContext = new TaskAttemptContext(
new QueryContext(tajoConf), null,
new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
fragmentProtos, null);
this.scanExec = new PartitionMergeScanExec(taskContext, scanNode, fragmentProtos);
this.scanExec.init();
} else {
close();
}
}
public QueryId getQueryId() {
return queryId;
}
public String getSessionId() {
return sessionId;
}
public TableDesc getTableDesc() {
return tableDesc;
}
@Override
public int getCurrentRowNumber() {
return currentNumRows;
}
public void close() throws IOException {
if(isStopped) {
return;
}
isStopped = true;
if (scanExec != null) {
scanExec.close();
scanExec = null;
}
if(rowBlock != null) {
rowBlock.release();
rowBlock = null;
}
//remove temporal final output
if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId);
if (tableDesc.getUri().equals(temporalResultDir.toUri())) {
asyncTaskService.run(() -> {
try {
temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), true);
} catch (IOException e) {
LOG.error(e);
}
}
);
}
}
LOG.info(String.format("\"Sent result to client for %s, queryId: %s %s rows: %d",
sessionId, queryId,
codecType != null ? ", compression: " + codecType : "",
totalRows
));
}
public List<Tuple> getNextTupleRows(int fetchRowNum) throws IOException {
List<Tuple> rows = new ArrayList<>();
if (scanExec == null) {
return rows;
}
int rowCount = 0;
while (!eof) {
Tuple tuple = scanExec.next();
if (tuple == null) {
eof = true;
break;
}
rows.add(tuple);
rowCount++;
currentNumRows++;
if (rowCount >= fetchRowNum) {
break;
}
if (currentNumRows >= maxRow) {
eof = true;
break;
}
}
if(eof) {
close();
}
return rows;
}
public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
List<ByteString> rows = new ArrayList<>();
if (scanExec == null) {
return rows;
}
int rowCount = 0;
while (!eof) {
Tuple tuple = scanExec.next();
if (tuple == null) {
eof = true;
break;
}
rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
rowCount++;
currentNumRows++;
if (rowCount >= fetchRowNum) {
break;
}
if (currentNumRows >= maxRow) {
eof = true;
break;
}
}
if(eof) {
close();
}
return rows;
}
@Override
public SerializedResultSet nextRowBlock(int fetchRowNum) throws IOException {
try {
final SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder();
resultSetBuilder.setSchema(scanNode.getOutSchema().getProto());
resultSetBuilder.setRows(0);
if (isStopped) return resultSetBuilder.build();
if (nextFetch == null) {
nextFetch = fetchNextRowBlock(fetchRowNum);
}
MemoryRowBlock rowBlock = nextFetch.get();
if (rowBlock.rows() > 0) {
resultSetBuilder.setRows(rowBlock.rows());
MemoryBlock memoryBlock = rowBlock.getMemory();
if (codecType.isPresent()) {
byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()];
memoryBlock.getBuffer().getBytes(0, uncompressedBytes);
byte[] compressedBytes = CompressionUtil.compress(codecType.get(), uncompressedBytes);
resultSetBuilder.setDecompressedLength(uncompressedBytes.length);
resultSetBuilder.setDecompressCodec(codecType.get());
resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes));
} else {
ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes());
resultSetBuilder.setDecompressedLength(uncompressed.remaining());
resultSetBuilder.setSerializedTuples(ByteString.copyFrom(uncompressed));
}
}
// pre-fetch
if (!eof) {
nextFetch = fetchNextRowBlock(fetchRowNum);
} else {
close();
}
return resultSetBuilder.build();
} catch (Throwable t) {
close();
throw new TajoInternalError(t.getCause());
}
}
/**
* Asynchronously fetch final output data
*/
private Future<MemoryRowBlock> fetchNextRowBlock(final int fetchRowNum) throws IOException {
final SettableFuture<MemoryRowBlock> future = SettableFuture.create();
if (rowBlock == null) {
rowBlock = new MemoryRowBlock(SchemaUtil.toDataTypes(scanNode.getOutSchema()));
}
if (scanExec == null) {
rowBlock.clear();
future.set(rowBlock);
return future;
}
return asyncTaskService.supply(() -> {
try {
rowBlock.clear();
int endRow = currentNumRows + fetchRowNum;
while (currentNumRows < endRow) {
Tuple tuple = scanExec.next();
if (tuple == null) {
eof = true;
break;
} else {
rowBlock.getWriter().addTuple(tuple);
currentNumRows++;
if (currentNumRows >= maxRow) {
eof = true;
break;
}
}
}
if (rowBlock.rows() > 0) {
totalRows += rowBlock.rows();
}
return rowBlock;
} catch (Throwable t) {
throw new TajoInternalError(t);
}
});
}
@Override
public Schema getLogicalSchema() {
return scanNode.getOutSchema();
}
}