blob: 935913fcd644a31bb69c72a3a274e8c8e4343e90 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import static org.apache.ignite.client.fakes.FakeIgniteTables.TABLE_ONE_COLUMN;
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.lang.reflect.Constructor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteQueryProcessor;
import org.apache.ignite.client.fakes.FakeIgniteTables;
import org.apache.ignite.internal.client.ClientMetricSource;
import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.table.DataStreamerItem;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests client-side metrics (see also server-side metrics tests in {@link ServerMetricsTest}).
*/
public class ClientMetricsTest extends BaseIgniteAbstractTest {
private TestServer server;
private IgniteClient client;
@AfterEach
public void afterEach() throws Exception {
closeAll(client, server);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConnectionMetrics(boolean gracefulDisconnect) throws Exception {
server = AbstractClientTest.startServer(1000, new FakeIgnite());
client = clientBuilder().build();
ClientMetricSource metrics = metrics();
assertEquals(1, metrics.connectionsEstablished());
assertEquals(1, metrics.connectionsActive());
if (gracefulDisconnect) {
client.close();
} else {
server.close();
}
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics.connectionsActive() == 0, 1000),
() -> "connectionsActive: " + metrics.connectionsActive());
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics.connectionsLost() == (gracefulDisconnect ? 0 : 1), 1000),
() -> "connectionsLost: " + metrics.connectionsLost());
assertEquals(1, metrics.connectionsEstablished());
}
@Test
public void testConnectionsLostTimeout() throws InterruptedException {
Function<Integer, Boolean> shouldDropConnection = requestIdx -> requestIdx == 0;
Function<Integer, Integer> responseDelay = idx -> idx > 1 ? 500 : 0;
server = new TestServer(
1000,
new FakeIgnite(),
shouldDropConnection,
responseDelay,
null,
AbstractClientTest.clusterId,
null,
null
);
client = clientBuilder()
.connectTimeout(100)
.heartbeatTimeout(100)
.heartbeatInterval(100)
.build();
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics().connectionsLostTimeout() == 1, 1000),
() -> "connectionsLostTimeout: " + metrics().connectionsLostTimeout());
}
@Test
public void testHandshakesFailed() {
AtomicInteger counter = new AtomicInteger();
Function<Integer, Boolean> shouldDropConnection = requestIdx -> counter.incrementAndGet() < 3; // Fail 2 handshakes.
server = new TestServer(
1000,
new FakeIgnite(),
shouldDropConnection,
null,
null,
AbstractClientTest.clusterId,
null,
null
);
client = clientBuilder().build();
assertEquals(2, metrics().handshakesFailed());
}
@Test
public void testHandshakesFailedTimeout() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
Function<Integer, Boolean> shouldDropConnection = requestIdx -> false;
Function<Integer, Integer> responseDelay = idx -> counter.incrementAndGet() == 1 ? 500 : 0;
server = new TestServer(
1000,
new FakeIgnite(),
shouldDropConnection,
responseDelay,
null,
AbstractClientTest.clusterId,
null,
null
);
client = clientBuilder()
.connectTimeout(100)
.build();
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics().handshakesFailedTimeout() == 1, 1000),
() -> "handshakesFailedTimeout: " + metrics().handshakesFailedTimeout());
}
@Test
public void testRequestsMetrics() throws InterruptedException {
Function<Integer, Boolean> shouldDropConnection = requestIdx -> requestIdx == 5;
Function<Integer, Integer> responseDelay = idx -> idx == 4 ? 1000 : 0;
server = new TestServer(
1000,
new FakeIgnite(),
shouldDropConnection,
responseDelay,
null,
AbstractClientTest.clusterId,
null,
null
);
client = clientBuilder().build();
assertEquals(0, metrics().requestsActive());
assertEquals(0, metrics().requestsFailed());
assertEquals(0, metrics().requestsCompleted());
assertEquals(0, metrics().requestsSent());
assertEquals(0, metrics().requestsRetried());
client.tables().tables();
assertEquals(0, metrics().requestsActive());
assertEquals(0, metrics().requestsFailed());
assertEquals(1, metrics().requestsCompleted());
assertEquals(1, metrics().requestsSent());
assertEquals(0, metrics().requestsRetried());
assertThrowsSqlException(
Sql.STMT_VALIDATION_ERR,
"Query failed",
() -> client.sql().execute(null, FakeIgniteQueryProcessor.FAILED_SQL));
assertEquals(0, metrics().requestsActive());
assertEquals(1, metrics().requestsFailed());
assertEquals(1, metrics().requestsCompleted());
assertEquals(2, metrics().requestsSent());
assertEquals(0, metrics().requestsRetried());
client.tables().tablesAsync();
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics().requestsSent() == 3, 1000),
() -> "requestsSent: " + metrics().requestsSent());
assertEquals(1, metrics().requestsActive());
assertEquals(1, metrics().requestsFailed());
assertEquals(1, metrics().requestsCompleted());
assertEquals(0, metrics().requestsRetried());
client.tables().tables();
assertTrue(
IgniteTestUtils.waitForCondition(() -> metrics().requestsRetried() == 1, 1000),
() -> "requestsRetried: " + metrics().requestsRetried());
assertEquals(1, metrics().requestsFailed());
assertEquals(3, metrics().requestsCompleted());
assertEquals(6, metrics().requestsSent());
assertEquals(1, metrics().requestsRetried());
}
@Test
public void testBytesSentReceived() {
server = AbstractClientTest.startServer(1000, new FakeIgnite());
client = clientBuilder().build();
assertEquals(15, metrics().bytesSent());
assertEquals(85, metrics().bytesReceived());
client.tables().tables();
assertEquals(21, metrics().bytesSent());
assertEquals(106, metrics().bytesReceived());
}
@Test
public void testStreamer() throws InterruptedException {
server = AbstractClientTest.startServer(0, new FakeIgnite());
client = clientBuilder().build();
Table table = oneColumnTable();
CompletableFuture<Void> streamerFut;
try (var publisher = new SubmissionPublisher<DataStreamerItem<Tuple>>(ForkJoinPool.commonPool(), 1)) {
streamerFut = table.recordView().streamData(publisher, null);
publisher.submit(DataStreamerItem.of(Tuple.create().set("ID", "1")));
publisher.submit(DataStreamerItem.of(Tuple.create().set("ID", "2")));
assertTrue(IgniteTestUtils.waitForCondition(() -> metrics().streamerItemsQueued() == 2, 1000));
assertEquals(0, metrics().streamerItemsSent());
assertEquals(0, metrics().streamerBatchesSent());
assertEquals(0, metrics().streamerBatchesActive());
}
streamerFut.orTimeout(3, TimeUnit.SECONDS).join();
assertEquals(2, metrics().streamerItemsSent());
assertEquals(2, metrics().streamerBatchesSent());
assertEquals(0, metrics().streamerBatchesActive());
assertEquals(0, metrics().streamerItemsQueued());
}
@SuppressWarnings("ConfusingArgumentToVarargsMethod")
@Test
public void testAllMetricsAreRegistered() throws Exception {
Constructor<ClientMetricSource> ctor = ClientMetricSource.class.getDeclaredConstructor(null);
ctor.setAccessible(true);
ClientMetricSource source = ctor.newInstance();
MetricSet metricSet = source.enable();
assertNotNull(metricSet);
var holder = IgniteTestUtils.getFieldValue(source, AbstractMetricSource.class, "holder");
// Check that all fields from holder are registered in MetricSet.
for (var field : holder.getClass().getDeclaredFields()) {
if ("metrics".equals(field.getName())) {
continue;
}
var metricName = field.getName().substring(0, 1).toUpperCase() + field.getName().substring(1);
var metric = metricSet.get(metricName);
assertNotNull(metric, "Metric is not registered: " + metricName);
}
}
private Table oneColumnTable() {
if (server.ignite().tables().table(TABLE_ONE_COLUMN) == null) {
((FakeIgniteTables) server.ignite().tables()).createTable(TABLE_ONE_COLUMN);
}
return client.tables().table(TABLE_ONE_COLUMN);
}
private Builder clientBuilder() {
return IgniteClient.builder()
.addresses("127.0.0.1:" + server.port())
.metricsEnabled(true);
}
private ClientMetricSource metrics() {
return ((TcpIgniteClient) client).metrics();
}
}