blob: b8bb7d93001afa9f3d7d948e3a04de4da01b5dcf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TAgentPublishRequest;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFetchDataParams;
import org.apache.doris.thrift.TFetchDataResult;
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
import org.apache.doris.thrift.TScanCloseResult;
import org.apache.doris.thrift.TScanNextBatchParams;
import org.apache.doris.thrift.TScanOpenParams;
import org.apache.doris.thrift.TScanOpenResult;
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TTabletStatResult;
import org.apache.doris.thrift.TTransmitDataParams;
import org.apache.doris.thrift.TTransmitDataResult;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.utframe.UtFrameUtils;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class GenericPoolTest {
static GenericPool<BackendService.Client> backendService;
static ThriftServer service;
static String ip = "127.0.0.1";
static int port;
static {
port = UtFrameUtils.findValidPort();
}
static void close() {
if (service != null) {
service.stop();
}
}
@BeforeClass
public static void beforeClass() throws IOException {
try {
GenericKeyedObjectPoolConfig config = new GenericKeyedObjectPoolConfig();
config.setLifo(true); // set Last In First Out strategy
config.setMaxIdlePerKey(2); // (default 8)
config.setMinIdlePerKey(0); // (default 0)
config.setMaxTotalPerKey(2); // (default 8)
config.setMaxTotal(3); // (default -1)
config.setMaxWaitMillis(500);
// new ClientPool
backendService = new GenericPool("BackendService", config, 0);
// new ThriftService
TProcessor tprocessor = new BackendService.Processor<BackendService.Iface>(
new InternalProcessor());
service = new ThriftServer(port, tprocessor);
service.start();
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
close();
}
}
@AfterClass
public static void afterClass() throws IOException {
close();
}
private static class InternalProcessor implements BackendService.Iface {
public InternalProcessor() {
//
}
@Override
public TExecPlanFragmentResult execPlanFragment(TExecPlanFragmentParams params) {
return new TExecPlanFragmentResult();
}
@Override
public TCancelPlanFragmentResult cancelPlanFragment(TCancelPlanFragmentParams params) {
return new TCancelPlanFragmentResult();
}
@Override
public TTransmitDataResult transmitData(TTransmitDataParams params) {
return new TTransmitDataResult();
}
@Override
public TFetchDataResult fetchData(TFetchDataParams params) {
TFetchDataResult result = new TFetchDataResult();
result.setPacketNum(123);
result.setResultBatch(new TResultBatch(new ArrayList<ByteBuffer>(), false, 0));
result.setEos(true);
return result;
}
@Override
public TAgentResult submitTasks(List<TAgentTaskRequest> tasks) throws TException {
return null;
}
@Override
public TAgentResult releaseSnapshot(String snapshot_path) throws TException {
return null;
}
@Override
public TAgentResult publishClusterState(TAgentPublishRequest request) throws TException {
return null;
}
@Override
public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
return null;
}
@Override
public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
return null;
}
@Override
public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
return null;
}
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshot_request) throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TExportStatusResult getExportStatus(TUniqueId task_id) throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TStatus eraseExportTask(TUniqueId task_id) throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TTabletStatResult getTabletStat() throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TStatus submitRoutineLoadTask(List<TRoutineLoadTask> tasks) throws TException {
// TODO Auto-generated method stub
return null;
}
@Override
public TScanOpenResult openScanner(TScanOpenParams params) throws TException {
return null;
}
@Override
public TScanBatchResult getNext(TScanNextBatchParams params) throws TException {
return null;
}
@Override
public TScanCloseResult closeScanner(TScanCloseParams params) throws TException {
return null;
}
}
@Test
public void testNormal() throws Exception {
TNetworkAddress address = new TNetworkAddress(ip, port);
BackendService.Client object = backendService.borrowObject(address);
TFetchDataResult result = object.fetchData(new TFetchDataParams(
PaloInternalServiceVersion.V1, new TUniqueId()));
Assert.assertEquals(result.getPacketNum(), 123);
backendService.returnObject(address, object);
}
@Test
public void testSetMaxPerKey() throws Exception {
TNetworkAddress address = new TNetworkAddress(ip, port);
BackendService.Client object1;
BackendService.Client object2;
BackendService.Client object3;
// first success
object1 = backendService.borrowObject(address);
// second success
object2 = backendService.borrowObject(address);
// third fail, because the max connection is 2
boolean flag = false;
try {
object3 = backendService.borrowObject(address);
} catch (java.util.NoSuchElementException e) {
flag = true;
// pass
} catch (Exception e) {
Assert.fail();
}
Assert.assertTrue(flag);
// fourth success, because we drop the object1
backendService.returnObject(address, object1);
object3 = null;
object3 = backendService.borrowObject(address);
Assert.assertTrue(object3 != null);
backendService.returnObject(address, object2);
backendService.returnObject(address, object3);
}
@Test
public void testException() throws Exception {
TNetworkAddress address = new TNetworkAddress(ip, port);
BackendService.Client object;
// borrow null
boolean flag = false;
try {
object = backendService.borrowObject(null);
} catch (NullPointerException e) {
flag = true;
}
Assert.assertTrue(flag);
flag = false;
// return twice
object = backendService.borrowObject(address);
backendService.returnObject(address, object);
try {
backendService.returnObject(address, object);
} catch (java.lang.IllegalStateException e) {
flag = true;
}
Assert.assertTrue(flag);
}
}