| /* |
| * Copyright 2017 HugeGraph Authors |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package com.baidu.hugegraph.computer.core.sort.sorting; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| |
| import com.baidu.hugegraph.computer.core.combiner.Combiner; |
| import com.baidu.hugegraph.computer.core.combiner.EdgeValueCombiner; |
| import com.baidu.hugegraph.computer.core.combiner.MessageValueCombiner; |
| import com.baidu.hugegraph.computer.core.combiner.PointerCombiner; |
| import com.baidu.hugegraph.computer.core.combiner.VertexValueCombiner; |
| import com.baidu.hugegraph.computer.core.common.ComputerContext; |
| import com.baidu.hugegraph.computer.core.common.Constants; |
| import com.baidu.hugegraph.computer.core.common.exception.ComputerException; |
| import com.baidu.hugegraph.computer.core.config.ComputerOptions; |
| import com.baidu.hugegraph.computer.core.config.Config; |
| import com.baidu.hugegraph.computer.core.graph.value.Value; |
| import com.baidu.hugegraph.computer.core.io.BytesOutput; |
| import com.baidu.hugegraph.computer.core.io.IOFactory; |
| import com.baidu.hugegraph.computer.core.io.RandomAccessInput; |
| import com.baidu.hugegraph.computer.core.io.RandomAccessOutput; |
| import com.baidu.hugegraph.computer.core.manager.Manager; |
| import com.baidu.hugegraph.computer.core.network.message.MessageType; |
| import com.baidu.hugegraph.computer.core.sender.WriteBuffers; |
| import com.baidu.hugegraph.computer.core.sort.BufferFileSorter; |
| import com.baidu.hugegraph.computer.core.sort.HgkvFileSorter; |
| import com.baidu.hugegraph.computer.core.sort.Sorter; |
| import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher; |
| import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher; |
| import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher; |
| import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher; |
| import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher; |
| import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator; |
| import com.baidu.hugegraph.computer.core.store.entry.KvEntry; |
| import com.baidu.hugegraph.util.ExecutorUtil; |
| import com.baidu.hugegraph.util.Log; |
| |
| public abstract class SortManager implements Manager { |
| |
| public static final Logger LOG = Log.logger(SortManager.class); |
| |
| private final ComputerContext context; |
| private final ExecutorService sortExecutor; |
| private final Sorter sorter; |
| private final int capacity; |
| private final int flushThreshold; |
| |
| public SortManager(ComputerContext context) { |
| this.context = context; |
| Config config = context.config(); |
| if (this.threadNum(config) != 0) { |
| this.sortExecutor = ExecutorUtil.newFixedThreadPool( |
| this.threadNum(config), this.threadPrefix()); |
| } else { |
| this.sortExecutor = null; |
| } |
| if (config.get(ComputerOptions.TRANSPORT_RECV_FILE_MODE)) { |
| this.sorter = new BufferFileSorter(config); |
| } else { |
| this.sorter = new HgkvFileSorter(config); |
| } |
| this.capacity = config.get( |
| ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY); |
| this.flushThreshold = config.get( |
| ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX); |
| } |
| |
| @Override |
| public abstract String name(); |
| |
| protected abstract String threadPrefix(); |
| |
| protected Integer threadNum(Config config) { |
| return config.get(ComputerOptions.SORT_THREAD_NUMS); |
| } |
| |
| @Override |
| public void init(Config config) { |
| // pass |
| } |
| |
| @Override |
| public void close(Config config) { |
| if (this.sortExecutor == null) { |
| return; |
| } |
| this.sortExecutor.shutdown(); |
| try { |
| this.sortExecutor.awaitTermination(Constants.SHUTDOWN_TIMEOUT, |
| TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted when waiting sort executor terminated"); |
| } |
| } |
| |
| public CompletableFuture<ByteBuffer> sort(MessageType type, |
| WriteBuffers buffer) { |
| return CompletableFuture.supplyAsync(() -> { |
| RandomAccessInput bufferForRead = buffer.wrapForRead(); |
| // TODOļ¼This ByteBuffer should be allocated from the off-heap |
| BytesOutput output = IOFactory.createBytesOutput(this.capacity); |
| InnerSortFlusher flusher = this.createSortFlusher( |
| type, output, |
| this.flushThreshold); |
| try { |
| this.sorter.sortBuffer(bufferForRead, flusher, |
| type == MessageType.EDGE); |
| } catch (Exception e) { |
| throw new ComputerException("Failed to sort buffers of %s " + |
| "message", e, type.name()); |
| } |
| |
| return ByteBuffer.wrap(output.buffer(), 0, (int) output.position()); |
| }, this.sortExecutor); |
| } |
| |
| public CompletableFuture<Void> mergeBuffers(List<RandomAccessInput> inputs, |
| String path, |
| boolean withSubKv, |
| OuterSortFlusher flusher) { |
| return CompletableFuture.runAsync(() -> { |
| if (withSubKv) { |
| flusher.sources(inputs.size()); |
| } |
| try { |
| this.sorter.mergeBuffers(inputs, flusher, path, withSubKv); |
| } catch (Exception e) { |
| throw new ComputerException( |
| "Failed to merge %s buffers to file '%s'", |
| e, inputs.size(), path); |
| } |
| }, this.sortExecutor); |
| } |
| |
| public void mergeInputs(List<String> inputs, List<String> outputs, |
| boolean withSubKv, OuterSortFlusher flusher) { |
| if (withSubKv) { |
| flusher.sources(inputs.size()); |
| } |
| try { |
| this.sorter.mergeInputs(inputs, flusher, outputs, withSubKv); |
| } catch (Exception e) { |
| throw new ComputerException( |
| "Failed to merge %s files into %s files", |
| e, inputs.size(), outputs.size()); |
| } |
| } |
| |
| public PeekableIterator<KvEntry> iterator(List<String> outputs, |
| boolean withSubKv) { |
| try { |
| return this.sorter.iterator(outputs, withSubKv); |
| } catch (IOException e) { |
| throw new ComputerException("Failed to iterate files: '%s'", |
| outputs); |
| } |
| } |
| |
| private InnerSortFlusher createSortFlusher(MessageType type, |
| RandomAccessOutput output, |
| int flushThreshold) { |
| PointerCombiner combiner; |
| boolean needSortSubKv; |
| |
| switch (type) { |
| case VERTEX: |
| combiner = new VertexValueCombiner(this.context); |
| needSortSubKv = false; |
| break; |
| case EDGE: |
| combiner = new EdgeValueCombiner(this.context); |
| needSortSubKv = true; |
| break; |
| case MSG: |
| combiner = this.createMessageCombiner(); |
| needSortSubKv = false; |
| break; |
| default: |
| throw new ComputerException("Unsupported combine message " + |
| "type for %s", type); |
| } |
| |
| InnerSortFlusher flusher; |
| if (combiner == null) { |
| flusher = new KvInnerSortFlusher(output); |
| } else { |
| if (needSortSubKv) { |
| flusher = new CombineSubKvInnerSortFlusher(output, combiner, |
| flushThreshold); |
| } else { |
| flusher = new CombineKvInnerSortFlusher(output, combiner); |
| } |
| } |
| |
| return flusher; |
| } |
| |
| private PointerCombiner createMessageCombiner() { |
| Config config = this.context.config(); |
| Combiner<Value> valueCombiner = config.createObject( |
| ComputerOptions.WORKER_COMBINER_CLASS, |
| false); |
| if (valueCombiner == null) { |
| return null; |
| } |
| return new MessageValueCombiner(this.context); |
| } |
| } |