blob: f1f7509a118257b2faedc52bfc69178da5129787 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.suite.integrate;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import com.baidu.hugegraph.computer.algorithm.centrality.pagerank.PageRankParams;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.DoubleValue;
import com.baidu.hugegraph.computer.core.manager.Managers;
import com.baidu.hugegraph.computer.core.master.MasterService;
import com.baidu.hugegraph.computer.core.network.DataClientManager;
import com.baidu.hugegraph.computer.core.network.connection.ConnectionManager;
import com.baidu.hugegraph.computer.core.network.message.Message;
import com.baidu.hugegraph.computer.core.network.netty.NettyTransportClient;
import com.baidu.hugegraph.computer.core.network.session.ClientSession;
import com.baidu.hugegraph.computer.core.util.ComputerContextUtil;
import com.baidu.hugegraph.computer.core.worker.WorkerService;
import com.baidu.hugegraph.config.RpcOptions;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.testutil.Whitebox;
public class SenderIntegrateTest {
private static final Class<?> COMPUTATION = MockComputation.class;
@BeforeClass
public static void init() {
// pass
}
@AfterClass
public static void clear() {
// pass
}
@Test
public void testOneWorker() throws InterruptedException {
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withBufferThreshold(50)
.withBufferCapacity(60)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(8090)
.build();
try (MasterService service = initMaster(args)) {
service.execute();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
Thread workerThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withBufferThreshold(50)
.withBufferCapacity(60)
.withTransoprtServerPort(8091)
.withRpcServerRemote("127.0.0.1:8090")
.build();
try (WorkerService service = initWorker(args)) {
service.execute();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
masterThread.start();
workerThread.start();
workerThread.join();
masterThread.join();
}
@Test
public void testMultiWorkers() throws InterruptedException {
int workerCount = 3;
int partitionCount = 5;
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_003")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(workerCount)
.withPartitionCount(partitionCount)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(8090)
.build();
try {
MasterService service = initMaster(args);
service.execute();
service.close();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
List<Thread> workers = new ArrayList<>(workerCount);
for (int i = 1; i <= workerCount; i++) {
int port = 8090 + i;
String dir = "[jobs-" + i + "]";
workers.add(new Thread(() -> {
String[] args;
args = OptionsBuilder.newInstance()
.withJobId("local_003")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(workerCount)
.withPartitionCount(partitionCount)
.withTransoprtServerPort(port)
.withRpcServerRemote("127.0.0.1:8090")
.withDataDirs(dir)
.build();
try {
WorkerService service = initWorker(args);
service.execute();
service.close();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}));
}
masterThread.start();
for (Thread worker : workers) {
worker.start();
}
for (Thread worker : workers) {
worker.join();
}
masterThread.join();
}
@Test
public void testOneWorkerWithBusyClient() throws InterruptedException {
Thread masterThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withWriteBufferHighMark(10)
.withWriteBufferLowMark(5)
.withRpcServerHost("127.0.0.1")
.withRpcServerPort(8090)
.build();
try (MasterService service = initMaster(args)) {
service.execute();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
Thread workerThread = new Thread(() -> {
String[] args = OptionsBuilder.newInstance()
.withJobId("local_002")
.withAlgorithm(PageRankParams.class)
.withResultName("rank")
.withResultClass(DoubleValue.class)
.withMessageClass(DoubleValue.class)
.withMaxSuperStep(3)
.withComputationClass(COMPUTATION)
.withWorkerCount(1)
.withTransoprtServerPort(8091)
.withWriteBufferHighMark(20)
.withWriteBufferLowMark(10)
.withRpcServerRemote("127.0.0.1:8090")
.build();
try (WorkerService service = initWorker(args)) {
// Let send rate slowly
this.slowSendFunc(service);
service.execute();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
});
masterThread.start();
workerThread.start();
workerThread.join();
masterThread.join();
}
private void slowSendFunc(WorkerService service) throws TransportException {
Managers managers = Whitebox.getInternalState(service, "managers");
DataClientManager clientManager = managers.get(
DataClientManager.NAME);
ConnectionManager connManager = Whitebox.getInternalState(
clientManager, "connManager");
NettyTransportClient client = (NettyTransportClient)
connManager.getOrCreateClient(
"127.0.0.1", 8091);
ClientSession clientSession = Whitebox.invoke(client.getClass(),
"clientSession", client);
Function<Message, Future<Void>> sendFuncBak = Whitebox.getInternalState(
clientSession,
"sendFunction");
Function<Message, Future<Void>> sendFunc = message -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return sendFuncBak.apply(message);
};
Whitebox.setInternalState(clientSession, "sendFunction", sendFunc);
}
private MasterService initMaster(String[] args) {
Config config = ComputerContextUtil.initContext(
ComputerContextUtil.convertToMap(args));
MasterService service = new MasterService();
service.init(config);
return service;
}
private WorkerService initWorker(String[] args) {
Config config = ComputerContextUtil.initContext(
ComputerContextUtil.convertToMap(args));
WorkerService service = new WorkerService();
service.init(config);
return service;
}
private static class OptionsBuilder {
private final List<String> options;
public static OptionsBuilder newInstance() {
return new OptionsBuilder();
}
public OptionsBuilder() {
this.options = new ArrayList<>();
}
public String[] build() {
return this.options.toArray(new String[0]);
}
public OptionsBuilder withJobId(String jobId) {
this.options.add(ComputerOptions.JOB_ID.name());
this.options.add(jobId);
return this;
}
public OptionsBuilder withAlgorithm(Class<?> clazz) {
this.options.add(ComputerOptions.ALGORITHM_PARAMS_CLASS.name());
this.options.add(clazz.getName());
return this;
}
public OptionsBuilder withResultClass(Class<?> clazz) {
this.options.add(ComputerOptions.ALGORITHM_RESULT_CLASS.name());
this.options.add(clazz.getName());
return this;
}
public OptionsBuilder withMessageClass(Class<?> clazz) {
this.options.add(ComputerOptions.ALGORITHM_MESSAGE_CLASS.name());
this.options.add(clazz.getName());
return this;
}
public OptionsBuilder withResultName(String name) {
this.options.add(ComputerOptions.OUTPUT_RESULT_NAME.name());
this.options.add(name);
return this;
}
public OptionsBuilder withMaxSuperStep(int maxSuperStep) {
this.options.add(ComputerOptions.BSP_MAX_SUPER_STEP.name());
this.options.add(String.valueOf(maxSuperStep));
return this;
}
public OptionsBuilder withComputationClass(Class<?> clazz) {
this.options.add(ComputerOptions.WORKER_COMPUTATION_CLASS.name());
this.options.add(clazz.getName());
return this;
}
public OptionsBuilder withWorkerCount(int count) {
this.options.add(ComputerOptions.JOB_WORKERS_COUNT.name());
this.options.add(String.valueOf(count));
return this;
}
public OptionsBuilder withPartitionCount(int count) {
this.options.add(ComputerOptions.JOB_PARTITIONS_COUNT.name());
this.options.add(String.valueOf(count));
return this;
}
public OptionsBuilder withBufferThreshold(int sizeInByte) {
this.options.add(ComputerOptions.WORKER_WRITE_BUFFER_THRESHOLD
.name());
this.options.add(String.valueOf(sizeInByte));
return this;
}
public OptionsBuilder withBufferCapacity(int sizeInByte) {
this.options.add(ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY
.name());
this.options.add(String.valueOf(sizeInByte));
return this;
}
public OptionsBuilder withTransoprtServerPort(int dataPort) {
this.options.add(ComputerOptions.TRANSPORT_SERVER_PORT.name());
this.options.add(String.valueOf(dataPort));
return this;
}
public OptionsBuilder withWriteBufferHighMark(int mark) {
this.options.add(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK
.name());
this.options.add(String.valueOf(mark));
return this;
}
public OptionsBuilder withWriteBufferLowMark(int mark) {
this.options.add(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK
.name());
this.options.add(String.valueOf(mark));
return this;
}
public OptionsBuilder withRpcServerHost(String host) {
this.options.add(RpcOptions.RPC_SERVER_HOST.name());
this.options.add(host);
return this;
}
public OptionsBuilder withRpcServerPort(int port) {
this.options.add(RpcOptions.RPC_SERVER_PORT.name());
this.options.add(String.valueOf(port));
return this;
}
public OptionsBuilder withRpcServerRemote(String remoteUrl) {
this.options.add(RpcOptions.RPC_REMOTE_URL.name());
this.options.add(remoteUrl);
return this;
}
public OptionsBuilder withDataDirs(String dataDirs) {
this.options.add(ComputerOptions.WORKER_DATA_DIRS.name());
this.options.add(String.valueOf(dataDirs));
return this;
}
}
}