blob: aece3799aa6f239c16c779028eec58cf52abba12 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.core.filter;
import static org.apache.servicecomb.core.Const.HIGHWAY;
import static org.apache.servicecomb.core.Const.RESTFUL;
import static org.apache.servicecomb.core.filter.FilterNode.buildChain;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.Transport;
import org.apache.servicecomb.core.definition.OperationConfig;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.filter.impl.ScheduleFilter;
import org.apache.servicecomb.core.filter.impl.TransportFilters;
import org.apache.servicecomb.swagger.invocation.Response;
import org.junit.BeforeClass;
import org.junit.Test;
import mockit.Expectations;
import mockit.Mocked;
public class FilterChainTest {
static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
static String THREAD_NAME;
@Mocked
Invocation invocation;
@Mocked
OperationMeta operationMeta;
@Mocked
OperationConfig operationConfig;
List<String> msg = new Vector<>();
Filter recordThreadFilter = (invocation, nextNode) -> {
msg.add(Thread.currentThread().getName());
if (nextNode == null) {
return CompletableFuture.completedFuture(Response.ok(null));
}
return nextNode.onFilter(invocation);
};
Filter scheduler = new ScheduleFilter();
Filter exceptionFilter = (invocation, nextNode) -> {
throw new IllegalStateException("e1");
};
@BeforeClass
public static void beforeClass() {
try {
THREAD_NAME = EXECUTOR.submit(() -> Thread.currentThread().getName()).get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private void mockInvocation() {
new Expectations() {
{
invocation.getOperationMeta();
result = operationMeta;
operationMeta.getExecutor();
result = EXECUTOR;
operationConfig.getNanoRequestWaitInPoolTimeout(anyString);
result = Long.MAX_VALUE;
}
};
}
@Test
public void should_switch_thread_after_schedule() throws ExecutionException, InterruptedException {
mockInvocation();
buildChain(recordThreadFilter, scheduler, recordThreadFilter)
.onFilter(invocation)
.get();
assertThat(msg).containsExactly("main", THREAD_NAME);
}
@Test
public void should_stop_chain_when_first_filter_throw_exception() {
ExecutionException executionException = (ExecutionException) catchThrowable(
() -> buildChain(exceptionFilter, recordThreadFilter)
.onFilter(invocation)
.get());
assertThat(msg).isEmpty();
assertThat(executionException.getCause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("e1");
}
@Test
public void should_stop_chain_when_middle_filter_throw_exception() {
ExecutionException executionException = (ExecutionException) catchThrowable(
() -> buildChain(recordThreadFilter, exceptionFilter, recordThreadFilter)
.onFilter(invocation)
.get());
assertThat(msg).containsExactly("main");
assertThat(executionException.getCause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("e1");
}
@Test
public void should_support_retry_logic() {
Filter exceptionFilter = (invocation, nextNode) -> {
throw new CompletionException(new IOException("net error"));
};
SimpleRetryFilter retryFilter = new SimpleRetryFilter().setMaxRetry(3);
CompletableFuture<Response> future = buildChain(retryFilter, recordThreadFilter, exceptionFilter)
.onFilter(invocation);
assertThat(future)
.failsWithin(Duration.ofSeconds(1))
.withThrowableOfType(ExecutionException.class)
.withCauseExactlyInstanceOf(IOException.class)
.withMessage("java.io.IOException: net error");
}
@Test
public void should_build_chain_with_TransportFilters(@Mocked Transport transport)
throws ExecutionException, InterruptedException {
mockInvocation();
new Expectations() {
{
invocation.getTransport();
result = transport;
}
};
TransportFilters transportFilters = new TransportFilters();
transportFilters.getChainByTransport().put(RESTFUL, buildChain(recordThreadFilter));
transportFilters.getChainByTransport().put(HIGHWAY, buildChain(recordThreadFilter, scheduler, recordThreadFilter));
FilterNode chain = buildChain(transportFilters, recordThreadFilter);
checkRestChain(transport, chain);
checkHighwayChain(transport, chain);
checkUnknownTransportChain(transport, chain);
}
private void checkUnknownTransportChain(Transport transport, FilterNode chain)
throws ExecutionException, InterruptedException {
msg.clear();
new Expectations() {
{
transport.getName();
result = "abc";
}
};
chain.onFilter(invocation)
.get();
assertThat(msg).containsExactly("main");
}
private void checkRestChain(Transport transport, FilterNode chain)
throws InterruptedException, ExecutionException {
msg.clear();
new Expectations() {
{
transport.getName();
result = RESTFUL;
}
};
chain.onFilter(invocation)
.get();
assertThat(msg).containsExactly("main", "main");
}
private void checkHighwayChain(Transport transport, FilterNode chain)
throws InterruptedException, ExecutionException {
msg.clear();
new Expectations() {
{
transport.getName();
result = HIGHWAY;
}
};
chain.onFilter(invocation)
.get();
assertThat(msg).containsExactly("main", THREAD_NAME, THREAD_NAME);
}
}