blob: 37db37f44e4df6001234b9b7150d62482d32e766 [file] [log] [blame]
<!DOCTYPE HTML>
<html lang="en">
<head>
<!-- Generated by javadoc (17) -->
<title>Source code</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="source: package: org.apache.hadoop.hbase.ipc, class: AbstractTestIPC">
<meta name="generator" content="javadoc/SourceToHTMLConverter">
<link rel="stylesheet" type="text/css" href="../../../../../../stylesheet.css" title="Style">
</head>
<body class="source-page">
<main role="main">
<div class="source-container">
<pre><span class="source-line-no">001</span><span id="line-1">/*</span>
<span class="source-line-no">002</span><span id="line-2"> * Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="source-line-no">003</span><span id="line-3"> * or more contributor license agreements. See the NOTICE file</span>
<span class="source-line-no">004</span><span id="line-4"> * distributed with this work for additional information</span>
<span class="source-line-no">005</span><span id="line-5"> * regarding copyright ownership. The ASF licenses this file</span>
<span class="source-line-no">006</span><span id="line-6"> * to you under the Apache License, Version 2.0 (the</span>
<span class="source-line-no">007</span><span id="line-7"> * "License"); you may not use this file except in compliance</span>
<span class="source-line-no">008</span><span id="line-8"> * with the License. You may obtain a copy of the License at</span>
<span class="source-line-no">009</span><span id="line-9"> *</span>
<span class="source-line-no">010</span><span id="line-10"> * http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="source-line-no">011</span><span id="line-11"> *</span>
<span class="source-line-no">012</span><span id="line-12"> * Unless required by applicable law or agreed to in writing, software</span>
<span class="source-line-no">013</span><span id="line-13"> * distributed under the License is distributed on an "AS IS" BASIS,</span>
<span class="source-line-no">014</span><span id="line-14"> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.</span>
<span class="source-line-no">015</span><span id="line-15"> * See the License for the specific language governing permissions and</span>
<span class="source-line-no">016</span><span id="line-16"> * limitations under the License.</span>
<span class="source-line-no">017</span><span id="line-17"> */</span>
<span class="source-line-no">018</span><span id="line-18">package org.apache.hadoop.hbase.ipc;</span>
<span class="source-line-no">019</span><span id="line-19"></span>
<span class="source-line-no">020</span><span id="line-20">import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;</span>
<span class="source-line-no">021</span><span id="line-21">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;</span>
<span class="source-line-no">022</span><span id="line-22">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;</span>
<span class="source-line-no">023</span><span id="line-23">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;</span>
<span class="source-line-no">024</span><span id="line-24">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;</span>
<span class="source-line-no">025</span><span id="line-25">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;</span>
<span class="source-line-no">026</span><span id="line-26">import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;</span>
<span class="source-line-no">027</span><span id="line-27">import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;</span>
<span class="source-line-no">028</span><span id="line-28">import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;</span>
<span class="source-line-no">029</span><span id="line-29">import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;</span>
<span class="source-line-no">030</span><span id="line-30">import static org.hamcrest.MatcherAssert.assertThat;</span>
<span class="source-line-no">031</span><span id="line-31">import static org.hamcrest.Matchers.allOf;</span>
<span class="source-line-no">032</span><span id="line-32">import static org.hamcrest.Matchers.containsString;</span>
<span class="source-line-no">033</span><span id="line-33">import static org.hamcrest.Matchers.everyItem;</span>
<span class="source-line-no">034</span><span id="line-34">import static org.hamcrest.Matchers.greaterThanOrEqualTo;</span>
<span class="source-line-no">035</span><span id="line-35">import static org.hamcrest.Matchers.hasItem;</span>
<span class="source-line-no">036</span><span id="line-36">import static org.hamcrest.Matchers.instanceOf;</span>
<span class="source-line-no">037</span><span id="line-37">import static org.hamcrest.Matchers.startsWith;</span>
<span class="source-line-no">038</span><span id="line-38">import static org.junit.Assert.assertEquals;</span>
<span class="source-line-no">039</span><span id="line-39">import static org.junit.Assert.assertFalse;</span>
<span class="source-line-no">040</span><span id="line-40">import static org.junit.Assert.assertNotNull;</span>
<span class="source-line-no">041</span><span id="line-41">import static org.junit.Assert.assertNull;</span>
<span class="source-line-no">042</span><span id="line-42">import static org.junit.Assert.assertThrows;</span>
<span class="source-line-no">043</span><span id="line-43">import static org.junit.Assert.assertTrue;</span>
<span class="source-line-no">044</span><span id="line-44">import static org.junit.Assert.fail;</span>
<span class="source-line-no">045</span><span id="line-45">import static org.mockito.ArgumentMatchers.any;</span>
<span class="source-line-no">046</span><span id="line-46">import static org.mockito.Mockito.mock;</span>
<span class="source-line-no">047</span><span id="line-47">import static org.mockito.Mockito.spy;</span>
<span class="source-line-no">048</span><span id="line-48">import static org.mockito.Mockito.verify;</span>
<span class="source-line-no">049</span><span id="line-49">import static org.mockito.Mockito.when;</span>
<span class="source-line-no">050</span><span id="line-50">import static org.mockito.internal.verification.VerificationModeFactory.times;</span>
<span class="source-line-no">051</span><span id="line-51"></span>
<span class="source-line-no">052</span><span id="line-52">import io.opentelemetry.api.common.AttributeKey;</span>
<span class="source-line-no">053</span><span id="line-53">import io.opentelemetry.api.trace.SpanKind;</span>
<span class="source-line-no">054</span><span id="line-54">import io.opentelemetry.api.trace.StatusCode;</span>
<span class="source-line-no">055</span><span id="line-55">import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;</span>
<span class="source-line-no">056</span><span id="line-56">import io.opentelemetry.sdk.trace.data.SpanData;</span>
<span class="source-line-no">057</span><span id="line-57">import java.io.IOException;</span>
<span class="source-line-no">058</span><span id="line-58">import java.net.InetSocketAddress;</span>
<span class="source-line-no">059</span><span id="line-59">import java.nio.channels.SocketChannel;</span>
<span class="source-line-no">060</span><span id="line-60">import java.time.Duration;</span>
<span class="source-line-no">061</span><span id="line-61">import java.util.ArrayList;</span>
<span class="source-line-no">062</span><span id="line-62">import java.util.Collections;</span>
<span class="source-line-no">063</span><span id="line-63">import java.util.List;</span>
<span class="source-line-no">064</span><span id="line-64">import org.apache.hadoop.conf.Configuration;</span>
<span class="source-line-no">065</span><span id="line-65">import org.apache.hadoop.hbase.CellScanner;</span>
<span class="source-line-no">066</span><span id="line-66">import org.apache.hadoop.hbase.DoNotRetryIOException;</span>
<span class="source-line-no">067</span><span id="line-67">import org.apache.hadoop.hbase.ExtendedCell;</span>
<span class="source-line-no">068</span><span id="line-68">import org.apache.hadoop.hbase.HBaseConfiguration;</span>
<span class="source-line-no">069</span><span id="line-69">import org.apache.hadoop.hbase.HBaseServerBase;</span>
<span class="source-line-no">070</span><span id="line-70">import org.apache.hadoop.hbase.KeyValue;</span>
<span class="source-line-no">071</span><span id="line-71">import org.apache.hadoop.hbase.MatcherPredicate;</span>
<span class="source-line-no">072</span><span id="line-72">import org.apache.hadoop.hbase.PrivateCellUtil;</span>
<span class="source-line-no">073</span><span id="line-73">import org.apache.hadoop.hbase.Server;</span>
<span class="source-line-no">074</span><span id="line-74">import org.apache.hadoop.hbase.ServerName;</span>
<span class="source-line-no">075</span><span id="line-75">import org.apache.hadoop.hbase.Waiter;</span>
<span class="source-line-no">076</span><span id="line-76">import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;</span>
<span class="source-line-no">077</span><span id="line-77">import org.apache.hadoop.hbase.nio.ByteBuff;</span>
<span class="source-line-no">078</span><span id="line-78">import org.apache.hadoop.hbase.security.User;</span>
<span class="source-line-no">079</span><span id="line-79">import org.apache.hadoop.hbase.util.Bytes;</span>
<span class="source-line-no">080</span><span id="line-80">import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;</span>
<span class="source-line-no">081</span><span id="line-81">import org.apache.hadoop.io.compress.GzipCodec;</span>
<span class="source-line-no">082</span><span id="line-82">import org.apache.hadoop.ipc.RemoteException;</span>
<span class="source-line-no">083</span><span id="line-83">import org.apache.hadoop.util.StringUtils;</span>
<span class="source-line-no">084</span><span id="line-84">import org.hamcrest.Matcher;</span>
<span class="source-line-no">085</span><span id="line-85">import org.junit.Before;</span>
<span class="source-line-no">086</span><span id="line-86">import org.junit.Rule;</span>
<span class="source-line-no">087</span><span id="line-87">import org.junit.Test;</span>
<span class="source-line-no">088</span><span id="line-88">import org.junit.runners.Parameterized.Parameter;</span>
<span class="source-line-no">089</span><span id="line-89">import org.slf4j.Logger;</span>
<span class="source-line-no">090</span><span id="line-90">import org.slf4j.LoggerFactory;</span>
<span class="source-line-no">091</span><span id="line-91"></span>
<span class="source-line-no">092</span><span id="line-92">import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;</span>
<span class="source-line-no">093</span><span id="line-93">import org.apache.hbase.thirdparty.com.google.common.collect.Lists;</span>
<span class="source-line-no">094</span><span id="line-94">import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;</span>
<span class="source-line-no">095</span><span id="line-95">import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;</span>
<span class="source-line-no">096</span><span id="line-96">import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;</span>
<span class="source-line-no">097</span><span id="line-97"></span>
<span class="source-line-no">098</span><span id="line-98">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;</span>
<span class="source-line-no">099</span><span id="line-99">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;</span>
<span class="source-line-no">100</span><span id="line-100">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;</span>
<span class="source-line-no">101</span><span id="line-101">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto;</span>
<span class="source-line-no">102</span><span id="line-102">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto;</span>
<span class="source-line-no">103</span><span id="line-103">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;</span>
<span class="source-line-no">104</span><span id="line-104">import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;</span>
<span class="source-line-no">105</span><span id="line-105">import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;</span>
<span class="source-line-no">106</span><span id="line-106">import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService;</span>
<span class="source-line-no">107</span><span id="line-107">import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest;</span>
<span class="source-line-no">108</span><span id="line-108">import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse;</span>
<span class="source-line-no">109</span><span id="line-109"></span>
<span class="source-line-no">110</span><span id="line-110">/**</span>
<span class="source-line-no">111</span><span id="line-111"> * Some basic ipc tests.</span>
<span class="source-line-no">112</span><span id="line-112"> */</span>
<span class="source-line-no">113</span><span id="line-113">public abstract class AbstractTestIPC {</span>
<span class="source-line-no">114</span><span id="line-114"></span>
<span class="source-line-no">115</span><span id="line-115"> private static final Logger LOG = LoggerFactory.getLogger(AbstractTestIPC.class);</span>
<span class="source-line-no">116</span><span id="line-116"></span>
<span class="source-line-no">117</span><span id="line-117"> private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");</span>
<span class="source-line-no">118</span><span id="line-118"> private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);</span>
<span class="source-line-no">119</span><span id="line-119"></span>
<span class="source-line-no">120</span><span id="line-120"> protected static final Configuration CONF = HBaseConfiguration.create();</span>
<span class="source-line-no">121</span><span id="line-121"></span>
<span class="source-line-no">122</span><span id="line-122"> protected RpcServer createRpcServer(Server server, String name,</span>
<span class="source-line-no">123</span><span id="line-123"> List&lt;BlockingServiceAndInterface&gt; services, InetSocketAddress bindAddress, Configuration conf,</span>
<span class="source-line-no">124</span><span id="line-124"> RpcScheduler scheduler) throws IOException {</span>
<span class="source-line-no">125</span><span id="line-125"> return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler);</span>
<span class="source-line-no">126</span><span id="line-126"> }</span>
<span class="source-line-no">127</span><span id="line-127"></span>
<span class="source-line-no">128</span><span id="line-128"> private RpcServer createRpcServer(String name, List&lt;BlockingServiceAndInterface&gt; services,</span>
<span class="source-line-no">129</span><span id="line-129"> InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException {</span>
<span class="source-line-no">130</span><span id="line-130"> return createRpcServer(null, name, services, bindAddress, conf, scheduler);</span>
<span class="source-line-no">131</span><span id="line-131"> }</span>
<span class="source-line-no">132</span><span id="line-132"></span>
<span class="source-line-no">133</span><span id="line-133"> protected abstract AbstractRpcClient&lt;?&gt; createRpcClientNoCodec(Configuration conf);</span>
<span class="source-line-no">134</span><span id="line-134"></span>
<span class="source-line-no">135</span><span id="line-135"> @Rule</span>
<span class="source-line-no">136</span><span id="line-136"> public OpenTelemetryRule traceRule = OpenTelemetryRule.create();</span>
<span class="source-line-no">137</span><span id="line-137"></span>
<span class="source-line-no">138</span><span id="line-138"> @Parameter(0)</span>
<span class="source-line-no">139</span><span id="line-139"> public Class&lt;? extends RpcServer&gt; rpcServerImpl;</span>
<span class="source-line-no">140</span><span id="line-140"></span>
<span class="source-line-no">141</span><span id="line-141"> @Before</span>
<span class="source-line-no">142</span><span id="line-142"> public void setUpBeforeTest() {</span>
<span class="source-line-no">143</span><span id="line-143"> CONF.setClass(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, rpcServerImpl, RpcServer.class);</span>
<span class="source-line-no">144</span><span id="line-144"> }</span>
<span class="source-line-no">145</span><span id="line-145"></span>
<span class="source-line-no">146</span><span id="line-146"> /**</span>
<span class="source-line-no">147</span><span id="line-147"> * Ensure we do not HAVE TO HAVE a codec.</span>
<span class="source-line-no">148</span><span id="line-148"> */</span>
<span class="source-line-no">149</span><span id="line-149"> @Test</span>
<span class="source-line-no">150</span><span id="line-150"> public void testNoCodec() throws IOException, ServiceException {</span>
<span class="source-line-no">151</span><span id="line-151"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">152</span><span id="line-152"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">153</span><span id="line-153"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">154</span><span id="line-154"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">155</span><span id="line-155"> try (AbstractRpcClient&lt;?&gt; client = createRpcClientNoCodec(clientConf)) {</span>
<span class="source-line-no">156</span><span id="line-156"> rpcServer.start();</span>
<span class="source-line-no">157</span><span id="line-157"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">158</span><span id="line-158"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">159</span><span id="line-159"> String message = "hello";</span>
<span class="source-line-no">160</span><span id="line-160"> assertEquals(message,</span>
<span class="source-line-no">161</span><span id="line-161"> stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());</span>
<span class="source-line-no">162</span><span id="line-162"> assertNull(pcrc.cellScanner());</span>
<span class="source-line-no">163</span><span id="line-163"> } finally {</span>
<span class="source-line-no">164</span><span id="line-164"> rpcServer.stop();</span>
<span class="source-line-no">165</span><span id="line-165"> }</span>
<span class="source-line-no">166</span><span id="line-166"> }</span>
<span class="source-line-no">167</span><span id="line-167"></span>
<span class="source-line-no">168</span><span id="line-168"> protected abstract AbstractRpcClient&lt;?&gt; createRpcClient(Configuration conf);</span>
<span class="source-line-no">169</span><span id="line-169"></span>
<span class="source-line-no">170</span><span id="line-170"> /**</span>
<span class="source-line-no">171</span><span id="line-171"> * It is hard to verify the compression is actually happening under the wraps. Hope that if</span>
<span class="source-line-no">172</span><span id="line-172"> * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to</span>
<span class="source-line-no">173</span><span id="line-173"> * confirm that compression is happening down in the client and server).</span>
<span class="source-line-no">174</span><span id="line-174"> */</span>
<span class="source-line-no">175</span><span id="line-175"> @Test</span>
<span class="source-line-no">176</span><span id="line-176"> public void testCompressCellBlock() throws IOException, ServiceException {</span>
<span class="source-line-no">177</span><span id="line-177"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">178</span><span id="line-178"> clientConf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());</span>
<span class="source-line-no">179</span><span id="line-179"> List&lt;ExtendedCell&gt; cells = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">180</span><span id="line-180"> int count = 3;</span>
<span class="source-line-no">181</span><span id="line-181"> for (int i = 0; i &lt; count; i++) {</span>
<span class="source-line-no">182</span><span id="line-182"> cells.add(CELL);</span>
<span class="source-line-no">183</span><span id="line-183"> }</span>
<span class="source-line-no">184</span><span id="line-184"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">185</span><span id="line-185"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">186</span><span id="line-186"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">187</span><span id="line-187"></span>
<span class="source-line-no">188</span><span id="line-188"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">189</span><span id="line-189"> rpcServer.start();</span>
<span class="source-line-no">190</span><span id="line-190"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">191</span><span id="line-191"> HBaseRpcController pcrc =</span>
<span class="source-line-no">192</span><span id="line-192"> new HBaseRpcControllerImpl(PrivateCellUtil.createExtendedCellScanner(cells));</span>
<span class="source-line-no">193</span><span id="line-193"> String message = "hello";</span>
<span class="source-line-no">194</span><span id="line-194"> assertEquals(message,</span>
<span class="source-line-no">195</span><span id="line-195"> stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());</span>
<span class="source-line-no">196</span><span id="line-196"> int index = 0;</span>
<span class="source-line-no">197</span><span id="line-197"> CellScanner cellScanner = pcrc.cellScanner();</span>
<span class="source-line-no">198</span><span id="line-198"> assertNotNull(cellScanner);</span>
<span class="source-line-no">199</span><span id="line-199"> while (cellScanner.advance()) {</span>
<span class="source-line-no">200</span><span id="line-200"> assertEquals(CELL, cellScanner.current());</span>
<span class="source-line-no">201</span><span id="line-201"> index++;</span>
<span class="source-line-no">202</span><span id="line-202"> }</span>
<span class="source-line-no">203</span><span id="line-203"> assertEquals(count, index);</span>
<span class="source-line-no">204</span><span id="line-204"> } finally {</span>
<span class="source-line-no">205</span><span id="line-205"> rpcServer.stop();</span>
<span class="source-line-no">206</span><span id="line-206"> }</span>
<span class="source-line-no">207</span><span id="line-207"> }</span>
<span class="source-line-no">208</span><span id="line-208"></span>
<span class="source-line-no">209</span><span id="line-209"> protected abstract AbstractRpcClient&lt;?&gt;</span>
<span class="source-line-no">210</span><span id="line-210"> createRpcClientRTEDuringConnectionSetup(Configuration conf) throws IOException;</span>
<span class="source-line-no">211</span><span id="line-211"></span>
<span class="source-line-no">212</span><span id="line-212"> @Test</span>
<span class="source-line-no">213</span><span id="line-213"> public void testRTEDuringConnectionSetup() throws Exception {</span>
<span class="source-line-no">214</span><span id="line-214"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">215</span><span id="line-215"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">216</span><span id="line-216"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">217</span><span id="line-217"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">218</span><span id="line-218"> try (AbstractRpcClient&lt;?&gt; client = createRpcClientRTEDuringConnectionSetup(clientConf)) {</span>
<span class="source-line-no">219</span><span id="line-219"> rpcServer.start();</span>
<span class="source-line-no">220</span><span id="line-220"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">221</span><span id="line-221"> stub.ping(null, EmptyRequestProto.getDefaultInstance());</span>
<span class="source-line-no">222</span><span id="line-222"> fail("Expected an exception to have been thrown!");</span>
<span class="source-line-no">223</span><span id="line-223"> } catch (Exception e) {</span>
<span class="source-line-no">224</span><span id="line-224"> LOG.info("Caught expected exception: " + e.toString());</span>
<span class="source-line-no">225</span><span id="line-225"> assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));</span>
<span class="source-line-no">226</span><span id="line-226"> } finally {</span>
<span class="source-line-no">227</span><span id="line-227"> rpcServer.stop();</span>
<span class="source-line-no">228</span><span id="line-228"> }</span>
<span class="source-line-no">229</span><span id="line-229"> }</span>
<span class="source-line-no">230</span><span id="line-230"></span>
<span class="source-line-no">231</span><span id="line-231"> /**</span>
<span class="source-line-no">232</span><span id="line-232"> * Tests that the rpc scheduler is called when requests arrive.</span>
<span class="source-line-no">233</span><span id="line-233"> */</span>
<span class="source-line-no">234</span><span id="line-234"> @Test</span>
<span class="source-line-no">235</span><span id="line-235"> public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {</span>
<span class="source-line-no">236</span><span id="line-236"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">237</span><span id="line-237"> RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">238</span><span id="line-238"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">239</span><span id="line-239"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">240</span><span id="line-240"> new InetSocketAddress("localhost", 0), CONF, scheduler);</span>
<span class="source-line-no">241</span><span id="line-241"> verify(scheduler).init(any(RpcScheduler.Context.class));</span>
<span class="source-line-no">242</span><span id="line-242"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">243</span><span id="line-243"> rpcServer.start();</span>
<span class="source-line-no">244</span><span id="line-244"> verify(scheduler).start();</span>
<span class="source-line-no">245</span><span id="line-245"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">246</span><span id="line-246"> EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();</span>
<span class="source-line-no">247</span><span id="line-247"> for (int i = 0; i &lt; 10; i++) {</span>
<span class="source-line-no">248</span><span id="line-248"> stub.echo(null, param);</span>
<span class="source-line-no">249</span><span id="line-249"> }</span>
<span class="source-line-no">250</span><span id="line-250"> verify(scheduler, times(10)).dispatch(any(CallRunner.class));</span>
<span class="source-line-no">251</span><span id="line-251"> } finally {</span>
<span class="source-line-no">252</span><span id="line-252"> rpcServer.stop();</span>
<span class="source-line-no">253</span><span id="line-253"> verify(scheduler).stop();</span>
<span class="source-line-no">254</span><span id="line-254"> }</span>
<span class="source-line-no">255</span><span id="line-255"> }</span>
<span class="source-line-no">256</span><span id="line-256"></span>
<span class="source-line-no">257</span><span id="line-257"> /** Tests that the rpc scheduler is called when requests arrive. */</span>
<span class="source-line-no">258</span><span id="line-258"> @Test</span>
<span class="source-line-no">259</span><span id="line-259"> public void testRpcMaxRequestSize() throws IOException, ServiceException {</span>
<span class="source-line-no">260</span><span id="line-260"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">261</span><span id="line-261"> clientConf.setInt(RpcServer.MAX_REQUEST_SIZE, 1000);</span>
<span class="source-line-no">262</span><span id="line-262"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">263</span><span id="line-263"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">264</span><span id="line-264"> new InetSocketAddress("localhost", 0), clientConf, new FifoRpcScheduler(clientConf, 1));</span>
<span class="source-line-no">265</span><span id="line-265"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">266</span><span id="line-266"> rpcServer.start();</span>
<span class="source-line-no">267</span><span id="line-267"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">268</span><span id="line-268"> StringBuilder message = new StringBuilder(1200);</span>
<span class="source-line-no">269</span><span id="line-269"> for (int i = 0; i &lt; 200; i++) {</span>
<span class="source-line-no">270</span><span id="line-270"> message.append("hello.");</span>
<span class="source-line-no">271</span><span id="line-271"> }</span>
<span class="source-line-no">272</span><span id="line-272"> // set total RPC size bigger than 100 bytes</span>
<span class="source-line-no">273</span><span id="line-273"> EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build();</span>
<span class="source-line-no">274</span><span id="line-274"> stub.echo(new HBaseRpcControllerImpl(</span>
<span class="source-line-no">275</span><span id="line-275"> PrivateCellUtil.createExtendedCellScanner(ImmutableList.&lt;ExtendedCell&gt; of(CELL))), param);</span>
<span class="source-line-no">276</span><span id="line-276"> fail("RPC should have failed because it exceeds max request size");</span>
<span class="source-line-no">277</span><span id="line-277"> } catch (ServiceException e) {</span>
<span class="source-line-no">278</span><span id="line-278"> LOG.info("Caught expected exception: " + e);</span>
<span class="source-line-no">279</span><span id="line-279"> assertTrue(e.toString(),</span>
<span class="source-line-no">280</span><span id="line-280"> StringUtils.stringifyException(e).contains("RequestTooBigException"));</span>
<span class="source-line-no">281</span><span id="line-281"> } finally {</span>
<span class="source-line-no">282</span><span id="line-282"> rpcServer.stop();</span>
<span class="source-line-no">283</span><span id="line-283"> }</span>
<span class="source-line-no">284</span><span id="line-284"> }</span>
<span class="source-line-no">285</span><span id="line-285"></span>
<span class="source-line-no">286</span><span id="line-286"> /**</span>
<span class="source-line-no">287</span><span id="line-287"> * Tests that the RpcServer creates &amp; dispatches CallRunner object to scheduler with non-null</span>
<span class="source-line-no">288</span><span id="line-288"> * remoteAddress set to its Call Object</span>
<span class="source-line-no">289</span><span id="line-289"> */</span>
<span class="source-line-no">290</span><span id="line-290"> @Test</span>
<span class="source-line-no">291</span><span id="line-291"> public void testRpcServerForNotNullRemoteAddressInCallObject()</span>
<span class="source-line-no">292</span><span id="line-292"> throws IOException, ServiceException {</span>
<span class="source-line-no">293</span><span id="line-293"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">294</span><span id="line-294"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">295</span><span id="line-295"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">296</span><span id="line-296"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">297</span><span id="line-297"> InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);</span>
<span class="source-line-no">298</span><span id="line-298"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">299</span><span id="line-299"> rpcServer.start();</span>
<span class="source-line-no">300</span><span id="line-300"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">301</span><span id="line-301"> assertEquals(localAddr.getAddress().getHostAddress(),</span>
<span class="source-line-no">302</span><span id="line-302"> stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr());</span>
<span class="source-line-no">303</span><span id="line-303"> } finally {</span>
<span class="source-line-no">304</span><span id="line-304"> rpcServer.stop();</span>
<span class="source-line-no">305</span><span id="line-305"> }</span>
<span class="source-line-no">306</span><span id="line-306"> }</span>
<span class="source-line-no">307</span><span id="line-307"></span>
<span class="source-line-no">308</span><span id="line-308"> @Test</span>
<span class="source-line-no">309</span><span id="line-309"> public void testRemoteError() throws IOException, ServiceException {</span>
<span class="source-line-no">310</span><span id="line-310"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">311</span><span id="line-311"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">312</span><span id="line-312"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">313</span><span id="line-313"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">314</span><span id="line-314"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">315</span><span id="line-315"> rpcServer.start();</span>
<span class="source-line-no">316</span><span id="line-316"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">317</span><span id="line-317"> stub.error(null, EmptyRequestProto.getDefaultInstance());</span>
<span class="source-line-no">318</span><span id="line-318"> } catch (ServiceException e) {</span>
<span class="source-line-no">319</span><span id="line-319"> LOG.info("Caught expected exception: " + e);</span>
<span class="source-line-no">320</span><span id="line-320"> IOException ioe = ProtobufUtil.handleRemoteException(e);</span>
<span class="source-line-no">321</span><span id="line-321"> assertTrue(ioe instanceof DoNotRetryIOException);</span>
<span class="source-line-no">322</span><span id="line-322"> assertTrue(ioe.getMessage().contains("server error!"));</span>
<span class="source-line-no">323</span><span id="line-323"> } finally {</span>
<span class="source-line-no">324</span><span id="line-324"> rpcServer.stop();</span>
<span class="source-line-no">325</span><span id="line-325"> }</span>
<span class="source-line-no">326</span><span id="line-326"> }</span>
<span class="source-line-no">327</span><span id="line-327"></span>
<span class="source-line-no">328</span><span id="line-328"> @Test</span>
<span class="source-line-no">329</span><span id="line-329"> public void testTimeout() throws IOException {</span>
<span class="source-line-no">330</span><span id="line-330"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">331</span><span id="line-331"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">332</span><span id="line-332"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">333</span><span id="line-333"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">334</span><span id="line-334"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">335</span><span id="line-335"> rpcServer.start();</span>
<span class="source-line-no">336</span><span id="line-336"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">337</span><span id="line-337"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">338</span><span id="line-338"> int ms = 1000;</span>
<span class="source-line-no">339</span><span id="line-339"> int timeout = 100;</span>
<span class="source-line-no">340</span><span id="line-340"> for (int i = 0; i &lt; 10; i++) {</span>
<span class="source-line-no">341</span><span id="line-341"> pcrc.reset();</span>
<span class="source-line-no">342</span><span id="line-342"> pcrc.setCallTimeout(timeout);</span>
<span class="source-line-no">343</span><span id="line-343"> long startTime = System.nanoTime();</span>
<span class="source-line-no">344</span><span id="line-344"> try {</span>
<span class="source-line-no">345</span><span id="line-345"> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build());</span>
<span class="source-line-no">346</span><span id="line-346"> } catch (ServiceException e) {</span>
<span class="source-line-no">347</span><span id="line-347"> long waitTime = (System.nanoTime() - startTime) / 1000000;</span>
<span class="source-line-no">348</span><span id="line-348"> // expected</span>
<span class="source-line-no">349</span><span id="line-349"> LOG.info("Caught expected exception: " + e);</span>
<span class="source-line-no">350</span><span id="line-350"> IOException ioe = ProtobufUtil.handleRemoteException(e);</span>
<span class="source-line-no">351</span><span id="line-351"> assertTrue(ioe.getCause() instanceof CallTimeoutException);</span>
<span class="source-line-no">352</span><span id="line-352"> // confirm that we got exception before the actual pause.</span>
<span class="source-line-no">353</span><span id="line-353"> assertTrue(waitTime &lt; ms);</span>
<span class="source-line-no">354</span><span id="line-354"> }</span>
<span class="source-line-no">355</span><span id="line-355"> }</span>
<span class="source-line-no">356</span><span id="line-356"> } finally {</span>
<span class="source-line-no">357</span><span id="line-357"> rpcServer.stop();</span>
<span class="source-line-no">358</span><span id="line-358"> }</span>
<span class="source-line-no">359</span><span id="line-359"> }</span>
<span class="source-line-no">360</span><span id="line-360"></span>
<span class="source-line-no">361</span><span id="line-361"> private static class FailingSimpleRpcServer extends SimpleRpcServer {</span>
<span class="source-line-no">362</span><span id="line-362"></span>
<span class="source-line-no">363</span><span id="line-363"> FailingSimpleRpcServer(Server server, String name,</span>
<span class="source-line-no">364</span><span id="line-364"> List&lt;RpcServer.BlockingServiceAndInterface&gt; services, InetSocketAddress bindAddress,</span>
<span class="source-line-no">365</span><span id="line-365"> Configuration conf, RpcScheduler scheduler) throws IOException {</span>
<span class="source-line-no">366</span><span id="line-366"> super(server, name, services, bindAddress, conf, scheduler, true);</span>
<span class="source-line-no">367</span><span id="line-367"> }</span>
<span class="source-line-no">368</span><span id="line-368"></span>
<span class="source-line-no">369</span><span id="line-369"> final class FailingConnection extends SimpleServerRpcConnection {</span>
<span class="source-line-no">370</span><span id="line-370"> private FailingConnection(FailingSimpleRpcServer rpcServer, SocketChannel channel,</span>
<span class="source-line-no">371</span><span id="line-371"> long lastContact) {</span>
<span class="source-line-no">372</span><span id="line-372"> super(rpcServer, channel, lastContact);</span>
<span class="source-line-no">373</span><span id="line-373"> }</span>
<span class="source-line-no">374</span><span id="line-374"></span>
<span class="source-line-no">375</span><span id="line-375"> @Override</span>
<span class="source-line-no">376</span><span id="line-376"> public void processRequest(ByteBuff buf) throws IOException, InterruptedException {</span>
<span class="source-line-no">377</span><span id="line-377"> // this will throw exception after the connection header is read, and an RPC is sent</span>
<span class="source-line-no">378</span><span id="line-378"> // from client</span>
<span class="source-line-no">379</span><span id="line-379"> throw new DoNotRetryIOException("Failing for test");</span>
<span class="source-line-no">380</span><span id="line-380"> }</span>
<span class="source-line-no">381</span><span id="line-381"> }</span>
<span class="source-line-no">382</span><span id="line-382"></span>
<span class="source-line-no">383</span><span id="line-383"> @Override</span>
<span class="source-line-no">384</span><span id="line-384"> protected SimpleServerRpcConnection getConnection(SocketChannel channel, long time) {</span>
<span class="source-line-no">385</span><span id="line-385"> return new FailingConnection(this, channel, time);</span>
<span class="source-line-no">386</span><span id="line-386"> }</span>
<span class="source-line-no">387</span><span id="line-387"> }</span>
<span class="source-line-no">388</span><span id="line-388"></span>
<span class="source-line-no">389</span><span id="line-389"> protected RpcServer createTestFailingRpcServer(final String name,</span>
<span class="source-line-no">390</span><span id="line-390"> final List&lt;BlockingServiceAndInterface&gt; services, final InetSocketAddress bindAddress,</span>
<span class="source-line-no">391</span><span id="line-391"> Configuration conf, RpcScheduler scheduler) throws IOException {</span>
<span class="source-line-no">392</span><span id="line-392"> if (rpcServerImpl.equals(NettyRpcServer.class)) {</span>
<span class="source-line-no">393</span><span id="line-393"> return new FailingNettyRpcServer(null, name, services, bindAddress, conf, scheduler);</span>
<span class="source-line-no">394</span><span id="line-394"> } else {</span>
<span class="source-line-no">395</span><span id="line-395"> return new FailingSimpleRpcServer(null, name, services, bindAddress, conf, scheduler);</span>
<span class="source-line-no">396</span><span id="line-396"> }</span>
<span class="source-line-no">397</span><span id="line-397"> }</span>
<span class="source-line-no">398</span><span id="line-398"></span>
<span class="source-line-no">399</span><span id="line-399"> /** Tests that the connection closing is handled by the client with outstanding RPC calls */</span>
<span class="source-line-no">400</span><span id="line-400"> @Test</span>
<span class="source-line-no">401</span><span id="line-401"> public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {</span>
<span class="source-line-no">402</span><span id="line-402"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">403</span><span id="line-403"> RpcServer rpcServer = createTestFailingRpcServer("testRpcServer",</span>
<span class="source-line-no">404</span><span id="line-404"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">405</span><span id="line-405"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">406</span><span id="line-406"></span>
<span class="source-line-no">407</span><span id="line-407"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">408</span><span id="line-408"> rpcServer.start();</span>
<span class="source-line-no">409</span><span id="line-409"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">410</span><span id="line-410"> EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();</span>
<span class="source-line-no">411</span><span id="line-411"> stub.echo(null, param);</span>
<span class="source-line-no">412</span><span id="line-412"> fail("RPC should have failed because connection closed");</span>
<span class="source-line-no">413</span><span id="line-413"> } catch (ServiceException e) {</span>
<span class="source-line-no">414</span><span id="line-414"> LOG.info("Caught expected exception: " + e.toString());</span>
<span class="source-line-no">415</span><span id="line-415"> } finally {</span>
<span class="source-line-no">416</span><span id="line-416"> rpcServer.stop();</span>
<span class="source-line-no">417</span><span id="line-417"> }</span>
<span class="source-line-no">418</span><span id="line-418"> }</span>
<span class="source-line-no">419</span><span id="line-419"></span>
<span class="source-line-no">420</span><span id="line-420"> @Test</span>
<span class="source-line-no">421</span><span id="line-421"> public void testAsyncEcho() throws IOException {</span>
<span class="source-line-no">422</span><span id="line-422"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">423</span><span id="line-423"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">424</span><span id="line-424"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">425</span><span id="line-425"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">426</span><span id="line-426"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">427</span><span id="line-427"> rpcServer.start();</span>
<span class="source-line-no">428</span><span id="line-428"> Interface stub = newStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">429</span><span id="line-429"> int num = 10;</span>
<span class="source-line-no">430</span><span id="line-430"> List&lt;HBaseRpcController&gt; pcrcList = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">431</span><span id="line-431"> List&lt;BlockingRpcCallback&lt;EchoResponseProto&gt;&gt; callbackList = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">432</span><span id="line-432"> for (int i = 0; i &lt; num; i++) {</span>
<span class="source-line-no">433</span><span id="line-433"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">434</span><span id="line-434"> BlockingRpcCallback&lt;EchoResponseProto&gt; done = new BlockingRpcCallback&lt;&gt;();</span>
<span class="source-line-no">435</span><span id="line-435"> stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);</span>
<span class="source-line-no">436</span><span id="line-436"> pcrcList.add(pcrc);</span>
<span class="source-line-no">437</span><span id="line-437"> callbackList.add(done);</span>
<span class="source-line-no">438</span><span id="line-438"> }</span>
<span class="source-line-no">439</span><span id="line-439"> for (int i = 0; i &lt; num; i++) {</span>
<span class="source-line-no">440</span><span id="line-440"> EchoResponseProto resp = callbackList.get(i).get();</span>
<span class="source-line-no">441</span><span id="line-441"> HBaseRpcController pcrc = pcrcList.get(i);</span>
<span class="source-line-no">442</span><span id="line-442"> assertEquals("hello-" + i, resp.getMessage());</span>
<span class="source-line-no">443</span><span id="line-443"> assertFalse(pcrc.failed());</span>
<span class="source-line-no">444</span><span id="line-444"> assertNull(pcrc.cellScanner());</span>
<span class="source-line-no">445</span><span id="line-445"> }</span>
<span class="source-line-no">446</span><span id="line-446"> } finally {</span>
<span class="source-line-no">447</span><span id="line-447"> rpcServer.stop();</span>
<span class="source-line-no">448</span><span id="line-448"> }</span>
<span class="source-line-no">449</span><span id="line-449"> }</span>
<span class="source-line-no">450</span><span id="line-450"></span>
<span class="source-line-no">451</span><span id="line-451"> @Test</span>
<span class="source-line-no">452</span><span id="line-452"> public void testAsyncRemoteError() throws IOException {</span>
<span class="source-line-no">453</span><span id="line-453"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">454</span><span id="line-454"> AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf);</span>
<span class="source-line-no">455</span><span id="line-455"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">456</span><span id="line-456"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">457</span><span id="line-457"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">458</span><span id="line-458"> try {</span>
<span class="source-line-no">459</span><span id="line-459"> rpcServer.start();</span>
<span class="source-line-no">460</span><span id="line-460"> Interface stub = newStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">461</span><span id="line-461"> BlockingRpcCallback&lt;EmptyResponseProto&gt; callback = new BlockingRpcCallback&lt;&gt;();</span>
<span class="source-line-no">462</span><span id="line-462"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">463</span><span id="line-463"> stub.error(pcrc, EmptyRequestProto.getDefaultInstance(), callback);</span>
<span class="source-line-no">464</span><span id="line-464"> assertNull(callback.get());</span>
<span class="source-line-no">465</span><span id="line-465"> assertTrue(pcrc.failed());</span>
<span class="source-line-no">466</span><span id="line-466"> LOG.info("Caught expected exception: " + pcrc.getFailed());</span>
<span class="source-line-no">467</span><span id="line-467"> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());</span>
<span class="source-line-no">468</span><span id="line-468"> assertTrue(ioe instanceof DoNotRetryIOException);</span>
<span class="source-line-no">469</span><span id="line-469"> assertTrue(ioe.getMessage().contains("server error!"));</span>
<span class="source-line-no">470</span><span id="line-470"> } finally {</span>
<span class="source-line-no">471</span><span id="line-471"> client.close();</span>
<span class="source-line-no">472</span><span id="line-472"> rpcServer.stop();</span>
<span class="source-line-no">473</span><span id="line-473"> }</span>
<span class="source-line-no">474</span><span id="line-474"> }</span>
<span class="source-line-no">475</span><span id="line-475"></span>
<span class="source-line-no">476</span><span id="line-476"> @Test</span>
<span class="source-line-no">477</span><span id="line-477"> public void testAsyncTimeout() throws IOException {</span>
<span class="source-line-no">478</span><span id="line-478"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">479</span><span id="line-479"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">480</span><span id="line-480"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">481</span><span id="line-481"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">482</span><span id="line-482"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">483</span><span id="line-483"> rpcServer.start();</span>
<span class="source-line-no">484</span><span id="line-484"> Interface stub = newStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">485</span><span id="line-485"> List&lt;HBaseRpcController&gt; pcrcList = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">486</span><span id="line-486"> List&lt;BlockingRpcCallback&lt;EmptyResponseProto&gt;&gt; callbackList = new ArrayList&lt;&gt;();</span>
<span class="source-line-no">487</span><span id="line-487"> int ms = 1000;</span>
<span class="source-line-no">488</span><span id="line-488"> int timeout = 100;</span>
<span class="source-line-no">489</span><span id="line-489"> long startTime = System.nanoTime();</span>
<span class="source-line-no">490</span><span id="line-490"> for (int i = 0; i &lt; 10; i++) {</span>
<span class="source-line-no">491</span><span id="line-491"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">492</span><span id="line-492"> pcrc.setCallTimeout(timeout);</span>
<span class="source-line-no">493</span><span id="line-493"> BlockingRpcCallback&lt;EmptyResponseProto&gt; callback = new BlockingRpcCallback&lt;&gt;();</span>
<span class="source-line-no">494</span><span id="line-494"> stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build(), callback);</span>
<span class="source-line-no">495</span><span id="line-495"> pcrcList.add(pcrc);</span>
<span class="source-line-no">496</span><span id="line-496"> callbackList.add(callback);</span>
<span class="source-line-no">497</span><span id="line-497"> }</span>
<span class="source-line-no">498</span><span id="line-498"> for (BlockingRpcCallback&lt;?&gt; callback : callbackList) {</span>
<span class="source-line-no">499</span><span id="line-499"> assertNull(callback.get());</span>
<span class="source-line-no">500</span><span id="line-500"> }</span>
<span class="source-line-no">501</span><span id="line-501"> long waitTime = (System.nanoTime() - startTime) / 1000000;</span>
<span class="source-line-no">502</span><span id="line-502"> for (HBaseRpcController pcrc : pcrcList) {</span>
<span class="source-line-no">503</span><span id="line-503"> assertTrue(pcrc.failed());</span>
<span class="source-line-no">504</span><span id="line-504"> LOG.info("Caught expected exception: " + pcrc.getFailed());</span>
<span class="source-line-no">505</span><span id="line-505"> IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());</span>
<span class="source-line-no">506</span><span id="line-506"> assertTrue(ioe.getCause() instanceof CallTimeoutException);</span>
<span class="source-line-no">507</span><span id="line-507"> }</span>
<span class="source-line-no">508</span><span id="line-508"> // confirm that we got exception before the actual pause.</span>
<span class="source-line-no">509</span><span id="line-509"> assertTrue(waitTime &lt; ms);</span>
<span class="source-line-no">510</span><span id="line-510"> } finally {</span>
<span class="source-line-no">511</span><span id="line-511"> rpcServer.stop();</span>
<span class="source-line-no">512</span><span id="line-512"> }</span>
<span class="source-line-no">513</span><span id="line-513"> }</span>
<span class="source-line-no">514</span><span id="line-514"></span>
<span class="source-line-no">515</span><span id="line-515"> private SpanData waitSpan(Matcher&lt;SpanData&gt; matcher) {</span>
<span class="source-line-no">516</span><span id="line-516"> Waiter.waitFor(CONF, 1000,</span>
<span class="source-line-no">517</span><span id="line-517"> new MatcherPredicate&lt;&gt;(() -&gt; traceRule.getSpans(), hasItem(matcher)));</span>
<span class="source-line-no">518</span><span id="line-518"> return traceRule.getSpans().stream().filter(matcher::matches).findFirst()</span>
<span class="source-line-no">519</span><span id="line-519"> .orElseThrow(AssertionError::new);</span>
<span class="source-line-no">520</span><span id="line-520"> }</span>
<span class="source-line-no">521</span><span id="line-521"></span>
<span class="source-line-no">522</span><span id="line-522"> private static String buildIpcSpanName(final String packageAndService, final String methodName) {</span>
<span class="source-line-no">523</span><span id="line-523"> return packageAndService + "/" + methodName;</span>
<span class="source-line-no">524</span><span id="line-524"> }</span>
<span class="source-line-no">525</span><span id="line-525"></span>
<span class="source-line-no">526</span><span id="line-526"> private static Matcher&lt;SpanData&gt; buildIpcClientSpanMatcher(final String packageAndService,</span>
<span class="source-line-no">527</span><span id="line-527"> final String methodName) {</span>
<span class="source-line-no">528</span><span id="line-528"> return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),</span>
<span class="source-line-no">529</span><span id="line-529"> hasKind(SpanKind.CLIENT));</span>
<span class="source-line-no">530</span><span id="line-530"> }</span>
<span class="source-line-no">531</span><span id="line-531"></span>
<span class="source-line-no">532</span><span id="line-532"> private static Matcher&lt;SpanData&gt; buildIpcServerSpanMatcher(final String packageAndService,</span>
<span class="source-line-no">533</span><span id="line-533"> final String methodName) {</span>
<span class="source-line-no">534</span><span id="line-534"> return allOf(hasName(buildIpcSpanName(packageAndService, methodName)),</span>
<span class="source-line-no">535</span><span id="line-535"> hasKind(SpanKind.SERVER));</span>
<span class="source-line-no">536</span><span id="line-536"> }</span>
<span class="source-line-no">537</span><span id="line-537"></span>
<span class="source-line-no">538</span><span id="line-538"> private static Matcher&lt;SpanData&gt; buildIpcClientSpanAttributesMatcher(</span>
<span class="source-line-no">539</span><span id="line-539"> final String packageAndService, final String methodName, final InetSocketAddress isa) {</span>
<span class="source-line-no">540</span><span id="line-540"> return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),</span>
<span class="source-line-no">541</span><span id="line-541"> containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName),</span>
<span class="source-line-no">542</span><span id="line-542"> containsEntry("net.peer.name", isa.getHostName()),</span>
<span class="source-line-no">543</span><span id="line-543"> containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));</span>
<span class="source-line-no">544</span><span id="line-544"> }</span>
<span class="source-line-no">545</span><span id="line-545"></span>
<span class="source-line-no">546</span><span id="line-546"> private static Matcher&lt;SpanData&gt;</span>
<span class="source-line-no">547</span><span id="line-547"> buildIpcServerSpanAttributesMatcher(final String packageAndService, final String methodName) {</span>
<span class="source-line-no">548</span><span id="line-548"> return hasAttributes(allOf(containsEntry("rpc.system", "HBASE_RPC"),</span>
<span class="source-line-no">549</span><span id="line-549"> containsEntry("rpc.service", packageAndService), containsEntry("rpc.method", methodName)));</span>
<span class="source-line-no">550</span><span id="line-550"> }</span>
<span class="source-line-no">551</span><span id="line-551"></span>
<span class="source-line-no">552</span><span id="line-552"> private void assertRemoteSpan() {</span>
<span class="source-line-no">553</span><span id="line-553"> SpanData data = waitSpan(hasName("RpcServer.process"));</span>
<span class="source-line-no">554</span><span id="line-554"> assertTrue(data.getParentSpanContext().isRemote());</span>
<span class="source-line-no">555</span><span id="line-555"> assertEquals(SpanKind.SERVER, data.getKind());</span>
<span class="source-line-no">556</span><span id="line-556"> }</span>
<span class="source-line-no">557</span><span id="line-557"></span>
<span class="source-line-no">558</span><span id="line-558"> @Test</span>
<span class="source-line-no">559</span><span id="line-559"> public void testTracingSuccessIpc() throws IOException, ServiceException {</span>
<span class="source-line-no">560</span><span id="line-560"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">561</span><span id="line-561"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">562</span><span id="line-562"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">563</span><span id="line-563"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">564</span><span id="line-564"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">565</span><span id="line-565"> rpcServer.start();</span>
<span class="source-line-no">566</span><span id="line-566"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">567</span><span id="line-567"> stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());</span>
<span class="source-line-no">568</span><span id="line-568"> // use the ISA from the running server so that we can get the port selected.</span>
<span class="source-line-no">569</span><span id="line-569"> final InetSocketAddress isa = rpcServer.getListenerAddress();</span>
<span class="source-line-no">570</span><span id="line-570"> final SpanData pauseClientSpan =</span>
<span class="source-line-no">571</span><span id="line-571"> waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));</span>
<span class="source-line-no">572</span><span id="line-572"> assertThat(pauseClientSpan,</span>
<span class="source-line-no">573</span><span id="line-573"> buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause", isa));</span>
<span class="source-line-no">574</span><span id="line-574"> final SpanData pauseServerSpan =</span>
<span class="source-line-no">575</span><span id="line-575"> waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));</span>
<span class="source-line-no">576</span><span id="line-576"> assertThat(pauseServerSpan,</span>
<span class="source-line-no">577</span><span id="line-577"> buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "pause"));</span>
<span class="source-line-no">578</span><span id="line-578"> assertRemoteSpan();</span>
<span class="source-line-no">579</span><span id="line-579"> assertFalse("no spans provided", traceRule.getSpans().isEmpty());</span>
<span class="source-line-no">580</span><span id="line-580"> assertThat(traceRule.getSpans(),</span>
<span class="source-line-no">581</span><span id="line-581"> everyItem(allOf(hasStatusWithCode(StatusCode.OK),</span>
<span class="source-line-no">582</span><span id="line-582"> hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),</span>
<span class="source-line-no">583</span><span id="line-583"> hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));</span>
<span class="source-line-no">584</span><span id="line-584"> }</span>
<span class="source-line-no">585</span><span id="line-585"> }</span>
<span class="source-line-no">586</span><span id="line-586"></span>
<span class="source-line-no">587</span><span id="line-587"> @Test</span>
<span class="source-line-no">588</span><span id="line-588"> public void testTracingErrorIpc() throws IOException {</span>
<span class="source-line-no">589</span><span id="line-589"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">590</span><span id="line-590"> RpcServer rpcServer = createRpcServer("testRpcServer",</span>
<span class="source-line-no">591</span><span id="line-591"> Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),</span>
<span class="source-line-no">592</span><span id="line-592"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">593</span><span id="line-593"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">594</span><span id="line-594"> rpcServer.start();</span>
<span class="source-line-no">595</span><span id="line-595"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">596</span><span id="line-596"> // use the ISA from the running server so that we can get the port selected.</span>
<span class="source-line-no">597</span><span id="line-597"> assertThrows(ServiceException.class,</span>
<span class="source-line-no">598</span><span id="line-598"> () -&gt; stub.error(null, EmptyRequestProto.getDefaultInstance()));</span>
<span class="source-line-no">599</span><span id="line-599"> final InetSocketAddress isa = rpcServer.getListenerAddress();</span>
<span class="source-line-no">600</span><span id="line-600"> final SpanData errorClientSpan =</span>
<span class="source-line-no">601</span><span id="line-601"> waitSpan(buildIpcClientSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));</span>
<span class="source-line-no">602</span><span id="line-602"> assertThat(errorClientSpan,</span>
<span class="source-line-no">603</span><span id="line-603"> buildIpcClientSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error", isa));</span>
<span class="source-line-no">604</span><span id="line-604"> final SpanData errorServerSpan =</span>
<span class="source-line-no">605</span><span id="line-605"> waitSpan(buildIpcServerSpanMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));</span>
<span class="source-line-no">606</span><span id="line-606"> assertThat(errorServerSpan,</span>
<span class="source-line-no">607</span><span id="line-607"> buildIpcServerSpanAttributesMatcher("hbase.test.pb.TestProtobufRpcProto", "error"));</span>
<span class="source-line-no">608</span><span id="line-608"> assertRemoteSpan();</span>
<span class="source-line-no">609</span><span id="line-609"> assertFalse("no spans provided", traceRule.getSpans().isEmpty());</span>
<span class="source-line-no">610</span><span id="line-610"> assertThat(traceRule.getSpans(), everyItem(allOf(hasStatusWithCode(StatusCode.ERROR),</span>
<span class="source-line-no">611</span><span id="line-611"> hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));</span>
<span class="source-line-no">612</span><span id="line-612"> }</span>
<span class="source-line-no">613</span><span id="line-613"> }</span>
<span class="source-line-no">614</span><span id="line-614"></span>
<span class="source-line-no">615</span><span id="line-615"> protected abstract AbstractRpcClient&lt;?&gt; createBadAuthRpcClient(Configuration conf);</span>
<span class="source-line-no">616</span><span id="line-616"></span>
<span class="source-line-no">617</span><span id="line-617"> private IOException doBadPreableHeaderCall(BlockingInterface stub) {</span>
<span class="source-line-no">618</span><span id="line-618"> ServiceException se = assertThrows(ServiceException.class,</span>
<span class="source-line-no">619</span><span id="line-619"> () -&gt; stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()));</span>
<span class="source-line-no">620</span><span id="line-620"> return ProtobufUtil.handleRemoteException(se);</span>
<span class="source-line-no">621</span><span id="line-621"> }</span>
<span class="source-line-no">622</span><span id="line-622"></span>
<span class="source-line-no">623</span><span id="line-623"> @Test</span>
<span class="source-line-no">624</span><span id="line-624"> public void testBadPreambleHeader() throws Exception {</span>
<span class="source-line-no">625</span><span id="line-625"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">626</span><span id="line-626"> RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),</span>
<span class="source-line-no">627</span><span id="line-627"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">628</span><span id="line-628"> try (AbstractRpcClient&lt;?&gt; client = createBadAuthRpcClient(clientConf)) {</span>
<span class="source-line-no">629</span><span id="line-629"> rpcServer.start();</span>
<span class="source-line-no">630</span><span id="line-630"> BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());</span>
<span class="source-line-no">631</span><span id="line-631"> BadAuthException error = null;</span>
<span class="source-line-no">632</span><span id="line-632"> // for SimpleRpcServer, it is possible that we get a broken pipe before getting the</span>
<span class="source-line-no">633</span><span id="line-633"> // BadAuthException, so we add some retries here, see HBASE-28417</span>
<span class="source-line-no">634</span><span id="line-634"> for (int i = 0; i &lt; 10; i++) {</span>
<span class="source-line-no">635</span><span id="line-635"> IOException ioe = doBadPreableHeaderCall(stub);</span>
<span class="source-line-no">636</span><span id="line-636"> if (ioe instanceof BadAuthException) {</span>
<span class="source-line-no">637</span><span id="line-637"> error = (BadAuthException) ioe;</span>
<span class="source-line-no">638</span><span id="line-638"> break;</span>
<span class="source-line-no">639</span><span id="line-639"> }</span>
<span class="source-line-no">640</span><span id="line-640"> Thread.sleep(100);</span>
<span class="source-line-no">641</span><span id="line-641"> }</span>
<span class="source-line-no">642</span><span id="line-642"> assertNotNull("Can not get expected BadAuthException", error);</span>
<span class="source-line-no">643</span><span id="line-643"> assertThat(error.getMessage(), containsString("authName=unknown"));</span>
<span class="source-line-no">644</span><span id="line-644"> } finally {</span>
<span class="source-line-no">645</span><span id="line-645"> rpcServer.stop();</span>
<span class="source-line-no">646</span><span id="line-646"> }</span>
<span class="source-line-no">647</span><span id="line-647"> }</span>
<span class="source-line-no">648</span><span id="line-648"></span>
<span class="source-line-no">649</span><span id="line-649"> /**</span>
<span class="source-line-no">650</span><span id="line-650"> * Testcase for getting connection registry information through connection preamble header, see</span>
<span class="source-line-no">651</span><span id="line-651"> * HBASE-25051 for more details.</span>
<span class="source-line-no">652</span><span id="line-652"> */</span>
<span class="source-line-no">653</span><span id="line-653"> @Test</span>
<span class="source-line-no">654</span><span id="line-654"> public void testGetConnectionRegistry() throws IOException, ServiceException {</span>
<span class="source-line-no">655</span><span id="line-655"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">656</span><span id="line-656"> String clusterId = "test_cluster_id";</span>
<span class="source-line-no">657</span><span id="line-657"> HBaseServerBase&lt;?&gt; server = mock(HBaseServerBase.class);</span>
<span class="source-line-no">658</span><span id="line-658"> when(server.getClusterId()).thenReturn(clusterId);</span>
<span class="source-line-no">659</span><span id="line-659"> // do not need any services</span>
<span class="source-line-no">660</span><span id="line-660"> RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(),</span>
<span class="source-line-no">661</span><span id="line-661"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">662</span><span id="line-662"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">663</span><span id="line-663"> rpcServer.start();</span>
<span class="source-line-no">664</span><span id="line-664"> InetSocketAddress addr = rpcServer.getListenerAddress();</span>
<span class="source-line-no">665</span><span id="line-665"> BlockingRpcChannel channel =</span>
<span class="source-line-no">666</span><span id="line-666"> client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(),</span>
<span class="source-line-no">667</span><span id="line-667"> EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);</span>
<span class="source-line-no">668</span><span id="line-668"> ConnectionRegistryService.BlockingInterface stub =</span>
<span class="source-line-no">669</span><span id="line-669"> ConnectionRegistryService.newBlockingStub(channel);</span>
<span class="source-line-no">670</span><span id="line-670"> GetConnectionRegistryResponse resp =</span>
<span class="source-line-no">671</span><span id="line-671"> stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance());</span>
<span class="source-line-no">672</span><span id="line-672"> assertEquals(clusterId, resp.getClusterId());</span>
<span class="source-line-no">673</span><span id="line-673"> }</span>
<span class="source-line-no">674</span><span id="line-674"> }</span>
<span class="source-line-no">675</span><span id="line-675"></span>
<span class="source-line-no">676</span><span id="line-676"> /**</span>
<span class="source-line-no">677</span><span id="line-677"> * Test server does not support getting connection registry information through connection</span>
<span class="source-line-no">678</span><span id="line-678"> * preamble header, i.e, a new client connecting to an old server. We simulate this by using a</span>
<span class="source-line-no">679</span><span id="line-679"> * Server without implementing the ConnectionRegistryEndpoint interface.</span>
<span class="source-line-no">680</span><span id="line-680"> */</span>
<span class="source-line-no">681</span><span id="line-681"> @Test</span>
<span class="source-line-no">682</span><span id="line-682"> public void testGetConnectionRegistryError() throws IOException, ServiceException {</span>
<span class="source-line-no">683</span><span id="line-683"> Configuration clientConf = new Configuration(CONF);</span>
<span class="source-line-no">684</span><span id="line-684"> // do not need any services</span>
<span class="source-line-no">685</span><span id="line-685"> RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(),</span>
<span class="source-line-no">686</span><span id="line-686"> new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));</span>
<span class="source-line-no">687</span><span id="line-687"> try (AbstractRpcClient&lt;?&gt; client = createRpcClient(clientConf)) {</span>
<span class="source-line-no">688</span><span id="line-688"> rpcServer.start();</span>
<span class="source-line-no">689</span><span id="line-689"> InetSocketAddress addr = rpcServer.getListenerAddress();</span>
<span class="source-line-no">690</span><span id="line-690"> RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(),</span>
<span class="source-line-no">691</span><span id="line-691"> addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0);</span>
<span class="source-line-no">692</span><span id="line-692"> ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel);</span>
<span class="source-line-no">693</span><span id="line-693"> HBaseRpcController pcrc = new HBaseRpcControllerImpl();</span>
<span class="source-line-no">694</span><span id="line-694"> BlockingRpcCallback&lt;GetConnectionRegistryResponse&gt; done = new BlockingRpcCallback&lt;&gt;();</span>
<span class="source-line-no">695</span><span id="line-695"> stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done);</span>
<span class="source-line-no">696</span><span id="line-696"> // should have failed so no response</span>
<span class="source-line-no">697</span><span id="line-697"> assertNull(done.get());</span>
<span class="source-line-no">698</span><span id="line-698"> assertTrue(pcrc.failed());</span>
<span class="source-line-no">699</span><span id="line-699"> // should be a FatalConnectionException</span>
<span class="source-line-no">700</span><span id="line-700"> assertThat(pcrc.getFailed(), instanceOf(RemoteException.class));</span>
<span class="source-line-no">701</span><span id="line-701"> assertEquals(FatalConnectionException.class.getName(),</span>
<span class="source-line-no">702</span><span id="line-702"> ((RemoteException) pcrc.getFailed()).getClassName());</span>
<span class="source-line-no">703</span><span id="line-703"> assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER="));</span>
<span class="source-line-no">704</span><span id="line-704"> }</span>
<span class="source-line-no">705</span><span id="line-705"> }</span>
<span class="source-line-no">706</span><span id="line-706">}</span>
</pre>
</div>
</main>
</body>
</html>