blob: 6517fcbbc4287fcc4cb9cd294abe8e81bf849eaf [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.compute;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.partition.PartitionStat;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.receiver.MessageRecvManager;
import com.baidu.hugegraph.computer.core.receiver.MessageStat;
import com.baidu.hugegraph.computer.core.sender.MessageSendManager;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.util.Consumers;
import com.baidu.hugegraph.computer.core.worker.WorkerContext;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.core.worker.WorkerStat;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
public class ComputeManager {
private static final Logger LOG = Log.logger(ComputeManager.class);
private static final String PREFIX = "partition-compute-executor-%s";
private final ComputerContext context;
private final Managers managers;
private final Map<Integer, FileGraphPartition> partitions;
private final MessageRecvManager recvManager;
private final MessageSendManager sendManager;
private final ExecutorService computeExecutor;
public ComputeManager(ComputerContext context, Managers managers) {
this.context = context;
this.managers = managers;
this.partitions = new HashMap<>();
this.recvManager = this.managers.get(MessageRecvManager.NAME);
this.sendManager = this.managers.get(MessageSendManager.NAME);
int computeThreadNum = this.partitionComputeThreadNum(context.config());
this.computeExecutor = ExecutorUtil.newFixedThreadPool(
computeThreadNum, PREFIX);
LOG.info("Created partition compute thread poll, thread num: {}",
computeThreadNum);
}
private Integer partitionComputeThreadNum(Config config) {
return config.get(ComputerOptions.PARTITIONS_COMPUTE_THREAD_NUMS);
}
public WorkerStat input() {
WorkerStat workerStat = new WorkerStat();
this.recvManager.waitReceivedAllMessages();
Map<Integer, PeekableIterator<KvEntry>> vertices =
this.recvManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edges =
this.recvManager.edgePartitions();
// TODO: parallel input process
for (Map.Entry<Integer, PeekableIterator<KvEntry>> entry :
vertices.entrySet()) {
int partition = entry.getKey();
PeekableIterator<KvEntry> vertexIter = entry.getValue();
PeekableIterator<KvEntry> edgesIter =
edges.getOrDefault(
partition,
PeekableIterator.emptyIterator());
FileGraphPartition part = new FileGraphPartition(this.context,
this.managers,
partition);
PartitionStat partitionStat = null;
ComputerException inputException = null;
try {
partitionStat = part.input(vertexIter, edgesIter);
} catch (ComputerException e) {
inputException = e;
} finally {
try {
vertexIter.close();
edgesIter.close();
} catch (Exception e) {
String message = "Failed to close vertex or edge file " +
"iterator";
ComputerException closeException = new ComputerException(
message, e);
if (inputException != null) {
inputException.addSuppressed(closeException);
} else {
throw closeException;
}
}
if (inputException != null) {
throw inputException;
}
}
workerStat.add(partitionStat);
this.partitions.put(partition, part);
}
return workerStat;
}
/**
* Get compute-messages from MessageRecvManager, then put message to
* corresponding partition. Be called before
* {@link MessageRecvManager#beforeSuperstep} is called.
*/
public void takeRecvedMessages() {
Map<Integer, PeekableIterator<KvEntry>> messages =
this.recvManager.messagePartitions();
for (FileGraphPartition partition : this.partitions.values()) {
partition.messages(messages.get(partition.partition()));
}
}
public WorkerStat compute(WorkerContext context, int superstep) {
this.sendManager.startSend(MessageType.MSG);
WorkerStat workerStat = new WorkerStat();
Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();
/*
* Remark: The main thread can perceive the partition compute exception
* only after all partition compute completed, and only record the last
* exception.
*/
Consumers<FileGraphPartition> consumers =
new Consumers<>(this.computeExecutor, partition -> {
PartitionStat stat = partition.compute(context,
superstep);
stats.put(stat.partitionId(), stat);
});
consumers.start("partition-compute");
try {
for (FileGraphPartition partition : this.partitions.values()) {
consumers.provide(partition);
}
consumers.await();
} catch (Throwable t) {
throw new ComputerException("An exception occurred when " +
"partition parallel compute", t);
}
this.sendManager.finishSend(MessageType.MSG);
// After compute and send finish signal.
Map<Integer, MessageStat> recvStats = this.recvManager.messageStats();
for (Map.Entry<Integer, PartitionStat> entry : stats.entrySet()) {
PartitionStat partStat = entry.getValue();
int partitionId = partStat.partitionId();
MessageStat sendStat = this.sendManager.messageStat(partitionId);
partStat.mergeSendMessageStat(sendStat);
MessageStat recvStat = recvStats.get(partitionId);
if (recvStat != null) {
partStat.mergeRecvMessageStat(recvStat);
}
workerStat.add(partStat);
}
return workerStat;
}
public void output() {
// TODO: Write results back parallel
for (FileGraphPartition partition : this.partitions.values()) {
PartitionStat stat = partition.output();
LOG.info("Output partition {} complete, stat='{}'",
partition.partition(), stat);
}
}
public void close() {
this.computeExecutor.shutdown();
}
}