blob: d69ad74fdd6aca699abc165fe688e6f87b855420 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.runtime.util;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Header less {@link ChannelWriterOutputView}. The caller saves the writePosition information.
*/
public final class HeaderlessChannelWriterOutputView extends AbstractChannelWriterOutputView {
private final BlockChannelWriter<MemorySegment> writer; // the writer to the channel
private int blockCount; // the number of blocks used
public HeaderlessChannelWriterOutputView(BlockChannelWriter<MemorySegment> writer,
List<MemorySegment> memory, int segmentSize) {
super(writer, segmentSize, 0);
if (writer == null) {
throw new NullPointerException();
}
this.writer = writer;
Preconditions.checkNotNull(memory);
// load the segments into the queue
final LinkedBlockingQueue<MemorySegment> queue = writer.getReturnQueue();
for (int i = memory.size() - 1; i >= 0; --i) {
final MemorySegment seg = memory.get(i);
if (seg.size() != segmentSize) {
throw new IllegalArgumentException("This segment are not of the specified size.");
}
queue.add(seg);
}
// get the first segment
try {
advance();
} catch (IOException ioex) {
throw new RuntimeException(ioex);
}
}
/**
* Closes this OutputView, closing the underlying writer. And return number bytes in last
* memory segment.
*/
@Override
public int close() throws IOException {
if (!writer.isClosed()) {
int currentPositionInSegment = getCurrentPositionInSegment();
// write last segment
writer.writeBlock(getCurrentSegment());
clear();
writer.getReturnQueue().clear();
this.writer.close();
return currentPositionInSegment;
}
return -1;
}
@Override
public int getBlockCount() {
return this.blockCount;
}
@Override
public long getNumBytes() throws IOException {
return writer.getSize();
}
@Override
public long getNumCompressedBytes() throws IOException {
return writer.getSize();
}
@Override
public MemorySegment nextSegment(MemorySegment current, int posInSegment) throws IOException {
if (current != null) {
writer.writeBlock(current);
}
final MemorySegment next = this.writer.getNextReturnedBlock();
this.blockCount++;
return next;
}
}