blob: c7b9eb903c8d48d120b4cde8ffdba1e01e372266 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class InFlightRequestsTest {
private InFlightRequests inFlightRequests;
private int correlationId;
private String dest = "dest";
@Before
public void setup() {
inFlightRequests = new InFlightRequests(12);
correlationId = 0;
}
@Test
public void testCompleteLastSent() {
int correlationId1 = addRequest(dest);
int correlationId2 = addRequest(dest);
assertEquals(2, inFlightRequests.count());
assertEquals(correlationId2, inFlightRequests.completeLastSent(dest).header.correlationId());
assertEquals(1, inFlightRequests.count());
assertEquals(correlationId1, inFlightRequests.completeLastSent(dest).header.correlationId());
assertEquals(0, inFlightRequests.count());
}
@Test
public void testClearAll() {
int correlationId1 = addRequest(dest);
int correlationId2 = addRequest(dest);
List<NetworkClient.InFlightRequest> clearedRequests = TestUtils.toList(this.inFlightRequests.clearAll(dest));
assertEquals(0, inFlightRequests.count());
assertEquals(2, clearedRequests.size());
assertEquals(correlationId1, clearedRequests.get(0).header.correlationId());
assertEquals(correlationId2, clearedRequests.get(1).header.correlationId());
}
@Test
public void testTimedOutNodes() {
Time time = new MockTime();
addRequest("A", time.milliseconds(), 50);
addRequest("B", time.milliseconds(), 200);
addRequest("B", time.milliseconds(), 100);
time.sleep(50);
assertEquals(Collections.emptyList(), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
time.sleep(25);
assertEquals(Collections.singletonList("A"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
time.sleep(50);
assertEquals(Arrays.asList("A", "B"), inFlightRequests.nodesWithTimedOutRequests(time.milliseconds()));
}
@Test
public void testCompleteNext() {
int correlationId1 = addRequest(dest);
int correlationId2 = addRequest(dest);
assertEquals(2, inFlightRequests.count());
assertEquals(correlationId1, inFlightRequests.completeNext(dest).header.correlationId());
assertEquals(1, inFlightRequests.count());
assertEquals(correlationId2, inFlightRequests.completeNext(dest).header.correlationId());
assertEquals(0, inFlightRequests.count());
}
@Test(expected = IllegalStateException.class)
public void testCompleteNextThrowsIfNoInflights() {
inFlightRequests.completeNext(dest);
}
@Test(expected = IllegalStateException.class)
public void testCompleteLastSentThrowsIfNoInFlights() {
inFlightRequests.completeLastSent(dest);
}
private int addRequest(String destination) {
return addRequest(destination, 0, 10000);
}
private int addRequest(String destination, long sendTimeMs, int requestTimeoutMs) {
int correlationId = this.correlationId;
this.correlationId += 1;
RequestHeader requestHeader = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", correlationId);
NetworkClient.InFlightRequest ifr = new NetworkClient.InFlightRequest(requestHeader, requestTimeoutMs, 0,
destination, null, false, false, null, null, sendTimeMs);
inFlightRequests.add(ifr);
return correlationId;
}
}