blob: 06b6ecd729a2094505812dfe2a686d2b19c48eb9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.utframe;
import org.apache.doris.common.ClientPool;
import org.apache.doris.proto.PCancelPlanFragmentRequest;
import org.apache.doris.proto.PCancelPlanFragmentResult;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.proto.PFetchDataResult;
import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PQueryStatistics;
import org.apache.doris.proto.PStatus;
import org.apache.doris.proto.PTriggerProfileReportResult;
import org.apache.doris.rpc.PExecPlanFragmentRequest;
import org.apache.doris.rpc.PFetchDataRequest;
import org.apache.doris.rpc.PTriggerProfileReportRequest;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TAgentPublishRequest;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TBackendInfo;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportState;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFetchDataParams;
import org.apache.doris.thrift.TFetchDataResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
import org.apache.doris.thrift.TScanCloseResult;
import org.apache.doris.thrift.TScanNextBatchParams;
import org.apache.doris.thrift.TScanOpenParams;
import org.apache.doris.thrift.TScanOpenResult;
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletStatResult;
import org.apache.doris.thrift.TTransmitDataParams;
import org.apache.doris.thrift.TTransmitDataResult;
import org.apache.doris.thrift.TUniqueId;
import com.baidu.jprotobuf.pbrpc.ProtobufRPCService;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.thrift.TException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
/*
* This class is used to create mock backends.
* Usage can be found in Demon.java's beforeClass()
*
*
*/
public class MockedBackendFactory {
public static final String BE_DEFAULT_IP = "127.0.0.1";
public static final int BE_DEFAULT_HEARTBEAT_PORT = 9050;
public static final int BE_DEFAULT_THRIFT_PORT = 9060;
public static final int BE_DEFAULT_BRPC_PORT = 8060;
public static final int BE_DEFAULT_HTTP_PORT = 8040;
// create a default mocked backend with 3 default rpc services
public static MockedBackend createDefaultBackend() throws IOException {
return createBackend(BE_DEFAULT_IP, BE_DEFAULT_HEARTBEAT_PORT, BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_BRPC_PORT, BE_DEFAULT_HTTP_PORT,
new DefaultHeartbeatServiceImpl(BE_DEFAULT_THRIFT_PORT, BE_DEFAULT_HTTP_PORT, BE_DEFAULT_BRPC_PORT),
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
}
// create a mocked backend with customize parameters
public static MockedBackend createBackend(String host, int heartbeatPort, int thriftPort, int brpcPort, int httpPort,
HeartbeatService.Iface hbService, BeThriftService beThriftService, Object pBackendService)
throws IOException {
MockedBackend backend = new MockedBackend(host, heartbeatPort, thriftPort, brpcPort, httpPort, hbService,
beThriftService, pBackendService);
return backend;
}
// the default hearbeat service.
// User can implement HeartbeatService.Iface to create other custom heartbeat service.
public static class DefaultHeartbeatServiceImpl implements HeartbeatService.Iface {
private int beThriftPort;
private int beHttpPort;
private int beBrpcPort;
public DefaultHeartbeatServiceImpl(int beThriftPort, int beHttpPort, int beBrpcPort) {
this.beThriftPort = beThriftPort;
this.beHttpPort = beHttpPort;
this.beBrpcPort = beBrpcPort;
}
@Override
public THeartbeatResult heartbeat(TMasterInfo master_info) throws TException {
TBackendInfo backendInfo = new TBackendInfo(beThriftPort, beHttpPort);
backendInfo.setBrpcPort(beBrpcPort);
THeartbeatResult result = new THeartbeatResult(new TStatus(TStatusCode.OK), backendInfo);
return result;
}
}
// abstract BeThriftService.
// User can extends this abstract class to create other custom be thrift service
public static abstract class BeThriftService implements BackendService.Iface {
protected MockedBackend backend;
public void setBackend(MockedBackend backend) {
this.backend = backend;
}
public abstract void init();
}
// the default be thrift service extends from BeThriftService
public static class DefaultBeThriftServiceImpl extends BeThriftService {
// task queue to save all agent tasks coming from Frontend
private BlockingQueue<TAgentTaskRequest> taskQueue = Queues.newLinkedBlockingQueue();
private TBackend tBackend;
private long reportVersion = 0;
public DefaultBeThriftServiceImpl() {
}
@Override
public void init() {
tBackend = new TBackend(backend.getHost(), backend.getBeThriftPort(), backend.getHttpPort());
// start a thread to handle all agent tasks in taskQueue.
// Only return information that the task was successfully executed.
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
TAgentTaskRequest request = taskQueue.take();
System.out.println("get agent task request. type: " + request.getTaskType()
+ ", signature: " + request.getSignature() + ", fe addr: " + backend.getFeAddress());
TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(tBackend,
request.getTaskType(), request.getSignature(), new TStatus(TStatusCode.OK));
finishTaskRequest.setReportVersion(++reportVersion);
FrontendService.Client client = ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000);
System.out.println("get fe " + backend.getFeAddress() + " client: " + client);
client.finishTask(finishTaskRequest);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
@Override
public TExecPlanFragmentResult execPlanFragment(TExecPlanFragmentParams params) throws TException {
return null;
}
@Override
public TCancelPlanFragmentResult cancelPlanFragment(TCancelPlanFragmentParams params) throws TException {
return null;
}
@Override
public TTransmitDataResult transmitData(TTransmitDataParams params) throws TException {
return null;
}
@Override
public TFetchDataResult fetchData(TFetchDataParams params) throws TException {
return null;
}
@Override
public TAgentResult submitTasks(List<TAgentTaskRequest> tasks) throws TException {
for (TAgentTaskRequest request : tasks) {
taskQueue.add(request);
System.out.println("receive agent task request. type: " + request.getTaskType() + ", signature: "
+ request.getSignature());
}
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshot_request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TAgentResult releaseSnapshot(String snapshot_path) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TAgentResult publishClusterState(TAgentPublishRequest request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED);
}
@Override
public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws TException {
return new TStatus(TStatusCode.OK);
}
@Override
public TExportStatusResult getExportStatus(TUniqueId task_id) throws TException {
return new TExportStatusResult(new TStatus(TStatusCode.OK), TExportState.FINISHED);
}
@Override
public TStatus eraseExportTask(TUniqueId task_id) throws TException {
return new TStatus(TStatusCode.OK);
}
@Override
public TTabletStatResult getTabletStat() throws TException {
return new TTabletStatResult(Maps.newHashMap());
}
@Override
public TStatus submitRoutineLoadTask(List<TRoutineLoadTask> tasks) throws TException {
return new TStatus(TStatusCode.OK);
}
@Override
public TScanOpenResult openScanner(TScanOpenParams params) throws TException {
return null;
}
@Override
public TScanBatchResult getNext(TScanNextBatchParams params) throws TException {
return null;
}
@Override
public TScanCloseResult closeScanner(TScanCloseParams params) throws TException {
return null;
}
}
// The default Brpc service.
// TODO(cmy): Currently this service cannot correctly simulate the processing of query requests.
public static class DefaultPBackendServiceImpl {
@ProtobufRPCService(serviceName = "PBackendService", methodName = "exec_plan_fragment")
public PExecPlanFragmentResult exec_plan_fragment(PExecPlanFragmentRequest request) {
System.out.println("get exec_plan_fragment request");
PExecPlanFragmentResult result = new PExecPlanFragmentResult();
PStatus pStatus = new PStatus();
pStatus.status_code = 0;
result.status = pStatus;
return result;
}
@ProtobufRPCService(serviceName = "PBackendService", methodName = "cancel_plan_fragment")
public PCancelPlanFragmentResult cancel_plan_fragment(PCancelPlanFragmentRequest request) {
System.out.println("get cancel_plan_fragment request");
PCancelPlanFragmentResult result = new PCancelPlanFragmentResult();
PStatus pStatus = new PStatus();
pStatus.status_code = 0;
result.status = pStatus;
return result;
}
@ProtobufRPCService(serviceName = "PBackendService", methodName = "fetch_data")
public PFetchDataResult fetchDataAsync(PFetchDataRequest request) {
System.out.println("get fetch_data");
PFetchDataResult result = new PFetchDataResult();
PStatus pStatus = new PStatus();
pStatus.status_code = 0;
PQueryStatistics pQueryStatistics = new PQueryStatistics();
pQueryStatistics.scan_rows = 0L;
pQueryStatistics.scan_bytes = 0L;
result.status = pStatus;
result.packet_seq = 0L;
result.query_statistics = pQueryStatistics;
result.eos = true;
return result;
}
@ProtobufRPCService(serviceName = "PBackendService", methodName = "trigger_profile_report")
public PTriggerProfileReportResult triggerProfileReport(PTriggerProfileReportRequest request) {
return null;
}
@ProtobufRPCService(serviceName = "PBackendService", methodName = "get_info")
public PProxyResult getInfo(PProxyRequest request) {
return null;
}
}
}