blob: 600c2347a058b5acfe730693c6ce8e4c3b775d5a [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.sort.sorting;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.combiner.EdgeValueCombiner;
import com.baidu.hugegraph.computer.core.combiner.MessageValueCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.combiner.VertexValueCombiner;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.sender.WriteBuffers;
import com.baidu.hugegraph.computer.core.sort.BufferFileSorter;
import com.baidu.hugegraph.computer.core.sort.HgkvFileSorter;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
public abstract class SortManager implements Manager {
public static final Logger LOG = Log.logger(SortManager.class);
private final ComputerContext context;
private final ExecutorService sortExecutor;
private final Sorter sorter;
private final int capacity;
private final int flushThreshold;
public SortManager(ComputerContext context) {
this.context = context;
Config config = context.config();
if (this.threadNum(config) != 0) {
this.sortExecutor = ExecutorUtil.newFixedThreadPool(
this.threadNum(config), this.threadPrefix());
} else {
this.sortExecutor = null;
}
if (config.get(ComputerOptions.TRANSPORT_RECV_FILE_MODE)) {
this.sorter = new BufferFileSorter(config);
} else {
this.sorter = new HgkvFileSorter(config);
}
this.capacity = config.get(
ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY);
this.flushThreshold = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
}
@Override
public abstract String name();
protected abstract String threadPrefix();
protected Integer threadNum(Config config) {
return config.get(ComputerOptions.SORT_THREAD_NUMS);
}
@Override
public void init(Config config) {
// pass
}
@Override
public void close(Config config) {
if (this.sortExecutor == null) {
return;
}
this.sortExecutor.shutdown();
try {
this.sortExecutor.awaitTermination(Constants.SHUTDOWN_TIMEOUT,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Interrupted when waiting sort executor terminated");
}
}
public CompletableFuture<ByteBuffer> sort(MessageType type,
WriteBuffers buffer) {
return CompletableFuture.supplyAsync(() -> {
RandomAccessInput bufferForRead = buffer.wrapForRead();
// TODOļ¼šThis ByteBuffer should be allocated from the off-heap
BytesOutput output = IOFactory.createBytesOutput(this.capacity);
InnerSortFlusher flusher = this.createSortFlusher(
type, output,
this.flushThreshold);
try {
this.sorter.sortBuffer(bufferForRead, flusher,
type == MessageType.EDGE);
} catch (Exception e) {
throw new ComputerException("Failed to sort buffers of %s " +
"message", e, type.name());
}
return ByteBuffer.wrap(output.buffer(), 0, (int) output.position());
}, this.sortExecutor);
}
public CompletableFuture<Void> mergeBuffers(List<RandomAccessInput> inputs,
String path,
boolean withSubKv,
OuterSortFlusher flusher) {
return CompletableFuture.runAsync(() -> {
if (withSubKv) {
flusher.sources(inputs.size());
}
try {
this.sorter.mergeBuffers(inputs, flusher, path, withSubKv);
} catch (Exception e) {
throw new ComputerException(
"Failed to merge %s buffers to file '%s'",
e, inputs.size(), path);
}
}, this.sortExecutor);
}
public void mergeInputs(List<String> inputs, List<String> outputs,
boolean withSubKv, OuterSortFlusher flusher) {
if (withSubKv) {
flusher.sources(inputs.size());
}
try {
this.sorter.mergeInputs(inputs, flusher, outputs, withSubKv);
} catch (Exception e) {
throw new ComputerException(
"Failed to merge %s files into %s files",
e, inputs.size(), outputs.size());
}
}
public PeekableIterator<KvEntry> iterator(List<String> outputs,
boolean withSubKv) {
try {
return this.sorter.iterator(outputs, withSubKv);
} catch (IOException e) {
throw new ComputerException("Failed to iterate files: '%s'",
outputs);
}
}
private InnerSortFlusher createSortFlusher(MessageType type,
RandomAccessOutput output,
int flushThreshold) {
PointerCombiner combiner;
boolean needSortSubKv;
switch (type) {
case VERTEX:
combiner = new VertexValueCombiner(this.context);
needSortSubKv = false;
break;
case EDGE:
combiner = new EdgeValueCombiner(this.context);
needSortSubKv = true;
break;
case MSG:
combiner = this.createMessageCombiner();
needSortSubKv = false;
break;
default:
throw new ComputerException("Unsupported combine message " +
"type for %s", type);
}
InnerSortFlusher flusher;
if (combiner == null) {
flusher = new KvInnerSortFlusher(output);
} else {
if (needSortSubKv) {
flusher = new CombineSubKvInnerSortFlusher(output, combiner,
flushThreshold);
} else {
flusher = new CombineKvInnerSortFlusher(output, combiner);
}
}
return flusher;
}
private PointerCombiner createMessageCombiner() {
Config config = this.context.config();
Combiner<Value> valueCombiner = config.createObject(
ComputerOptions.WORKER_COMBINER_CLASS,
false);
if (valueCombiner == null) {
return null;
}
return new MessageValueCombiner(this.context);
}
}