blob: 63fd6652c1e1e97515ebb5167d4c391df5380291 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.receiver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.network.buffer.FileRegionBuffer;
import com.baidu.hugegraph.computer.core.network.buffer.NetworkBuffer;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.sort.sorting.SortManager;
import com.baidu.hugegraph.computer.core.store.SuperstepFileGenerator;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.core.util.FileUtil;
import com.baidu.hugegraph.util.Log;
/**
* Manage the buffers received for a partition and the files generated by
* sorting the buffers to file. The type of data may be VERTEX, EDGE, and
* MESSAGE.
*/
public abstract class MessageRecvPartition {
public static final Logger LOG = Log.logger(MessageRecvPartition.class);
private MessageRecvBuffers recvBuffers;
/*
* Used to sort the buffers that reached threshold
* ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT.
*/
private MessageRecvBuffers sortBuffers;
private final SortManager sortManager;
private List<String> outputFiles;
private final SuperstepFileGenerator fileGenerator;
private final boolean withSubKv;
private final int mergeFileNum;
private long totalBytes;
private final boolean useFileRegion;
private final AtomicReference<Throwable> exception;
public MessageRecvPartition(Config config,
SuperstepFileGenerator fileGenerator,
SortManager sortManager,
boolean withSubKv) {
this.fileGenerator = fileGenerator;
this.sortManager = sortManager;
this.withSubKv = withSubKv;
long buffersLimit = config.get(
ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT);
long waitSortTimeout = config.get(
ComputerOptions.WORKER_WAIT_SORT_TIMEOUT);
this.mergeFileNum = config.get(ComputerOptions.HGKV_MERGE_FILES_NUM);
this.useFileRegion = config.get(
ComputerOptions.TRANSPORT_RECV_FILE_MODE);
if (!this.useFileRegion) {
this.recvBuffers = new MessageRecvBuffers(buffersLimit,
waitSortTimeout);
this.sortBuffers = new MessageRecvBuffers(buffersLimit,
waitSortTimeout);
}
this.outputFiles = new ArrayList<>();
this.totalBytes = 0L;
this.exception = new AtomicReference<>();
}
/**
* Only one thread can call this method.
*/
public synchronized void addBuffer(NetworkBuffer buffer) {
this.totalBytes += buffer.length();
if (buffer instanceof FileRegionBuffer) {
String path = ((FileRegionBuffer) buffer).path();
this.outputFiles.add(path);
return;
}
this.recvBuffers.addBuffer(buffer);
if (this.recvBuffers.full()) {
// Wait for the previous sorting
this.sortBuffers.waitSorted();
// Transfer recvBuffers to sortBuffers, then sort and flush
this.swapReceiveAndSortBuffers();
this.flushSortBuffersAsync();
}
}
public synchronized PeekableIterator<KvEntry> iterator() {
/*
* TODO: create iterator directly from buffers if there is no
* outputFiles.
*/
if (!this.useFileRegion) {
this.flushAllBuffersAndWaitSorted();
}
this.mergeOutputFilesIfNeeded();
if (this.outputFiles.size() == 0) {
return PeekableIterator.emptyIterator();
}
return this.sortManager.iterator(this.outputFiles, this.withSubKv);
}
public synchronized long totalBytes() {
return this.totalBytes;
}
public synchronized MessageStat messageStat() {
// TODO: count the message received
return new MessageStat(0L, this.totalBytes);
}
protected abstract OuterSortFlusher outerSortFlusher();
protected abstract String type();
/**
* Flush the receive buffers to file, and wait both recvBuffers and
* sortBuffers to finish sorting.
* After this method be called, can not call
* {@link #addBuffer(NetworkBuffer)} any more.
*/
private void flushAllBuffersAndWaitSorted() {
this.sortBuffers.waitSorted();
if (this.recvBuffers.totalBytes() > 0) {
// Transfer recvBuffers to sortBuffers, then sort and flush
this.swapReceiveAndSortBuffers();
this.flushSortBuffersAsync();
this.sortBuffers.waitSorted();
}
this.checkException();
}
private void flushSortBuffersAsync() {
String path = this.genOutputPath();
this.mergeBuffersAsync(this.sortBuffers, path);
this.outputFiles.add(path);
}
private void mergeBuffersAsync(MessageRecvBuffers buffers, String path) {
this.checkException();
this.sortManager.mergeBuffers(buffers.buffers(), path,
this.withSubKv, this.outerSortFlusher())
.whenComplete((r , e) -> {
if (e != null) {
LOG.error("Failed to merge buffers", e);
// Just record the first error
this.exception.compareAndSet(null, e);
}
// Signal the buffers to prevent other thread wait indefinitely.
buffers.signalSorted();
});
}
private void swapReceiveAndSortBuffers() {
assert this.recvBuffers.totalBytes() > 0;
MessageRecvBuffers oldRecvBuffers = this.recvBuffers;
this.recvBuffers = this.sortBuffers;
this.sortBuffers = oldRecvBuffers;
// Prepare for the next buffer-adding/sorting
this.recvBuffers.prepareSort();
}
/**
* Merge outputFiles if needed, like merge 10000 files into 100 files.
*/
private void mergeOutputFilesIfNeeded() {
if (this.outputFiles.size() <= 1) {
return;
}
/*
* TODO Restore genOutputFileNames(sqrt(outputFiles.size()))
* after add Sorter#iterator() of subkv
*/
int mergeFileNum = this.mergeFileNum;
mergeFileNum = 1;
List<String> newOutputs = this.genOutputFileNames(mergeFileNum);
this.sortManager.mergeInputs(this.outputFiles, newOutputs,
this.withSubKv, this.outerSortFlusher());
FileUtil.deleteFilesQuietly(this.outputFiles);
this.outputFiles = newOutputs;
}
public String genOutputPath() {
return this.fileGenerator.nextPath(this.type());
}
private List<String> genOutputFileNames(int targetSize) {
List<String> files = new ArrayList<>(targetSize);
for (int i = 0; i < targetSize; i++) {
files.add(this.genOutputPath());
}
return files;
}
private void checkException() {
Throwable t = this.exception.get();
if (t != null) {
throw new ComputerException(t.getMessage(), t);
}
}
}