blob: 5c28af1a34b27a6e44e0709cf5cc33a35be7dc56 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.client;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.fail;
public class ConnectTriesPropertyTestClusterBits {
public static StringBuilder bitInfo;
public static final String fakeBitsInfo = "127.0.0.1:5000,127.0.0.1:5001";
public static List<Drillbit> drillbits;
public static final int drillBitCount = 1;
public static ZookeeperHelper zkHelper;
public static RemoteServiceSet remoteServiceSet;
public static DrillConfig drillConfig;
@BeforeClass
public static void testSetUp() throws Exception {
remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
zkHelper = new ZookeeperHelper();
zkHelper.startZookeeper(1);
// Creating Drillbits
drillConfig = zkHelper.getConfig();
try {
int drillBitStarted = 0;
drillbits = new ArrayList<>();
while(drillBitStarted < drillBitCount){
drillbits.add(Drillbit.start(drillConfig, remoteServiceSet));
++drillBitStarted;
}
} catch (DrillbitStartupException e) {
throw new RuntimeException("Failed to start drillbits.", e);
}
bitInfo = new StringBuilder();
for (int i = 0; i < drillBitCount; ++i) {
final DrillbitEndpoint currentEndPoint = drillbits.get(i).getContext().getEndpoint();
final String currentBitIp = currentEndPoint.getAddress();
final int currentBitPort = currentEndPoint.getUserPort();
bitInfo.append(",");
bitInfo.append(currentBitIp);
bitInfo.append(":");
bitInfo.append(currentBitPort);
}
}
@AfterClass
public static void testCleanUp() throws Exception {
AutoCloseables.close(drillbits);
zkHelper.stopZookeeper();
}
@Test
public void testSuccessUsingDirectConnectionAndFakeDrillbitPresent() throws Exception {
final StringBuilder endpoints = new StringBuilder(fakeBitsInfo);
endpoints.append(bitInfo);
Properties props = new Properties();
props.setProperty("drillbit", endpoints.toString());
props.setProperty("connect_limit", "3");
// Test with direct connection
DrillClient client = new DrillClient(true);
client.connect(props);
client.close();
}
@Test
public void testSuccessDirectConnectionDefaultConnectTriesAndFakeDrillbits() throws Exception {
final StringBuilder endpoints = new StringBuilder(fakeBitsInfo);
endpoints.append(bitInfo);
Properties props = new Properties();
props.setProperty("drillbit", endpoints.toString());
// Test with direct connection
DrillClient client = new DrillClient(true);
client.connect(props);
client.close();
}
@Test
public void testFailureUsingDirectConnectionAllFakeBits() throws Exception {
final StringBuilder endpoints = new StringBuilder(fakeBitsInfo);
Properties props = new Properties();
props.setProperty("drillbit", endpoints.toString());
props.setProperty("tries", "2");
// Test with direct connection
DrillClient client = new DrillClient(true);
try{
client.connect(props);
fail();
}catch(RpcException ex){
assertTrue(ex.getCause() instanceof ExecutionException);
client.close();
}
}
@Test
public void testSuccessUsingZKWithNoFakeBits() throws Exception {
Properties props = new Properties();
props.setProperty("tries", "2");
// Test with Cluster Coordinator connection
DrillClient client = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
client.connect(props);
client.close();
}
@Test
public void testSuccessUsingZKWithFakeBits() throws Exception {
Properties props = new Properties();
props.setProperty("tries", "3");
// Test with Cluster Coordinator connection
DrillClient client = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
// Create couple of fake drillbit endpoints and register with cluster coordinator
DrillbitEndpoint fakeEndPoint1 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5000).build();
DrillbitEndpoint fakeEndPoint2 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5001).build();
ClusterCoordinator.RegistrationHandle fakeEndPoint1Handle = remoteServiceSet.getCoordinator()
.register(fakeEndPoint1);
ClusterCoordinator.RegistrationHandle fakeEndPoint2Handle = remoteServiceSet.getCoordinator()
.register(fakeEndPoint2);
client.connect(props);
client.close();
// Remove the fake drillbits so that other tests are not affected
remoteServiceSet.getCoordinator().unregister(fakeEndPoint1Handle);
remoteServiceSet.getCoordinator().unregister(fakeEndPoint2Handle);
}
@Test
public void testSuccessUsingZKWithDefaultConnectTriesFakeBits() throws Exception {
// Test with Cluster Coordinator connection
DrillClient client = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
// Create couple of fake drillbit endpoints and register with cluster coordinator
DrillbitEndpoint fakeEndPoint1 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5000).build();
DrillbitEndpoint fakeEndPoint2 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5001).build();
ClusterCoordinator.RegistrationHandle fakeEndPoint1Handle = remoteServiceSet.getCoordinator()
.register(fakeEndPoint1);
ClusterCoordinator.RegistrationHandle fakeEndPoint2Handle = remoteServiceSet.getCoordinator()
.register(fakeEndPoint2);
client.connect(null);
client.close();
// Remove the fake drillbits so that other tests are not affected
remoteServiceSet.getCoordinator().unregister(fakeEndPoint1Handle);
remoteServiceSet.getCoordinator().unregister(fakeEndPoint2Handle);
}
@Test
public void testInvalidConnectTriesValue() throws Exception {
Properties props = new Properties();
props.setProperty("tries", "abc");
// Test with Cluster Cordinator connection
DrillClient client = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
try {
client.connect(props);
fail();
} catch (RpcException ex) {
assertTrue(ex instanceof InvalidConnectionInfoException);
client.close();
}
}
@Test
public void testConnectFailureUsingZKWithOnlyFakeBits() throws Exception {
Properties props = new Properties();
props.setProperty("tries", "3");
// Test with Cluster Coordinator connection
RemoteServiceSet localServiceSet = RemoteServiceSet.getLocalServiceSet();
DrillClient client = new DrillClient(drillConfig, localServiceSet.getCoordinator());
// Create couple of fake drillbit endpoints and register with cluster coordinator
DrillbitEndpoint fakeEndPoint1 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5000).build();
DrillbitEndpoint fakeEndPoint2 = DrillbitEndpoint.newBuilder().setAddress("127.0.0.1").setUserPort(5001).build();
ClusterCoordinator.RegistrationHandle fakeEndPoint1Handle = localServiceSet.getCoordinator()
.register(fakeEndPoint1);
ClusterCoordinator.RegistrationHandle fakeEndPoint2Handle = localServiceSet.getCoordinator()
.register(fakeEndPoint2);
try {
client.connect(props);
fail();
} catch (RpcException ex) {
assertTrue(ex.getCause() instanceof ExecutionException);
client.close();
} finally {
// Remove the fake drillbits from local cluster cordinator
localServiceSet.getCoordinator().unregister(fakeEndPoint1Handle);
localServiceSet.getCoordinator().unregister(fakeEndPoint2Handle);
}
}
}