blob: 0112730df776be4772e8b37f54971c603dce67bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.banyandb.v1.client;
import com.google.protobuf.NullValue;
import io.grpc.ServerServiceDefinition;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
import org.apache.skywalking.banyandb.database.v1.GroupRegistryServiceGrpc;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBinding;
import org.apache.skywalking.banyandb.v1.client.metadata.Stream;
import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.mock;
public class BanyanDBClientStreamWriteTest extends AbstractBanyanDBClientTest {
private final GroupRegistryServiceGrpc.GroupRegistryServiceImplBase groupRegistryServiceImpl =
mock(GroupRegistryServiceGrpc.GroupRegistryServiceImplBase.class, delegatesTo(
new GroupRegistryServiceGrpc.GroupRegistryServiceImplBase() {
@Override
public void get(BanyandbDatabase.GroupRegistryServiceGetRequest request, StreamObserver<BanyandbDatabase.GroupRegistryServiceGetResponse> responseObserver) {
responseObserver.onNext(BanyandbDatabase.GroupRegistryServiceGetResponse.newBuilder()
.setGroup(BanyandbCommon.Group.newBuilder()
.setMetadata(BanyandbCommon.Metadata.newBuilder().setName("default").build())
.setCatalog(BanyandbCommon.Catalog.CATALOG_STREAM)
.setResourceOpts(BanyandbCommon.ResourceOpts.newBuilder()
.setShardNum(2)
.build())
.build())
.build());
responseObserver.onCompleted();
}
}));
private StreamBulkWriteProcessor streamBulkWriteProcessor;
private Stream stream;
@Before
public void setUp() throws IOException, BanyanDBException {
setUp(bindService(groupRegistryServiceImpl), bindStreamRegistry());
streamBulkWriteProcessor = client.buildStreamWriteProcessor(1000, 1, 1);
stream = Stream.create("default", "sw")
.setEntityRelativeTags("service_id", "service_instance_id", "state")
.addTagFamily(TagFamilySpec.create("data")
.addTagSpec(TagFamilySpec.TagSpec.newBinaryTag("data_binary"))
.build())
.addTagFamily(TagFamilySpec.create("searchable")
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("trace_id"))
.addTagSpec(TagFamilySpec.TagSpec.newIntTag("state"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_id"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_instance_id"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("endpoint_id"))
.addTagSpec(TagFamilySpec.TagSpec.newIntTag("duration"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("http.method"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("status_code"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("db.type"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("db.instance"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("mq.broker"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("mq.topic"))
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("mq.queue"))
.build())
.addIndex(IndexRule.create("trace_id", IndexRule.IndexType.INVERTED))
.build();
this.client.define(stream);
}
@After
public void shutdown() throws IOException {
streamBulkWriteProcessor.close();
}
@Test
public void testRegistry() {
Assert.assertEquals(indexRuleBindingRegistry.size(), 1);
Assert.assertTrue(indexRuleBindingRegistry.containsKey(IndexRuleBinding.defaultBindingRule("sw")));
Assert.assertEquals(indexRuleBindingRegistry.get(IndexRuleBinding.defaultBindingRule("sw")).getSubject().getCatalog(),
BanyandbCommon.Catalog.CATALOG_STREAM);
}
@Test
public void testWrite() throws Exception {
final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
final List<BanyandbStream.WriteRequest> writeRequestDelivered = new ArrayList<>();
// implement the fake service
final StreamServiceGrpc.StreamServiceImplBase serviceImpl =
new StreamServiceGrpc.StreamServiceImplBase() {
@Override
public StreamObserver<BanyandbStream.WriteRequest> write(StreamObserver<BanyandbStream.WriteResponse> responseObserver) {
return new StreamObserver<BanyandbStream.WriteRequest>() {
@Override
public void onNext(BanyandbStream.WriteRequest value) {
writeRequestDelivered.add(value);
responseObserver.onNext(BanyandbStream.WriteResponse.newBuilder().build());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
allRequestsDelivered.countDown();
}
};
}
};
serviceRegistry.addService(serviceImpl);
String segmentId = "1231.dfd.123123ssf";
String traceId = "trace_id-xxfff.111323";
String serviceId = "webapp_id";
String serviceInstanceId = "10.0.0.1_id";
String endpointId = "home_id";
int latency = 200;
int state = 1;
Instant now = Instant.now();
byte[] byteData = new byte[]{14};
String broker = "172.16.10.129:9092";
String topic = "topic_1";
String queue = "queue_2";
String httpStatusCode = "200";
String dbType = "SQL";
StreamWrite streamWrite = client.createStreamWrite("default", "sw", segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue((long) state)) // 1
.tag("service_id", Value.stringTagValue(serviceId)) // 2
.tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3
.tag("endpoint_id", Value.stringTagValue(endpointId)) // 4
.tag("duration", Value.longTagValue((long) latency)) // 5
.tag("http.method", Value.stringTagValue(null)) // 6
.tag("status_code", Value.stringTagValue(httpStatusCode)) // 7
.tag("db.type", Value.stringTagValue(dbType)) // 8
.tag("db.instance", null) // 9
.tag("mq.broker", Value.stringTagValue(broker)) // 10
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
streamBulkWriteProcessor.add(streamWrite);
if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
Assert.assertEquals(1, writeRequestDelivered.size());
final BanyandbStream.WriteRequest request = writeRequestDelivered.get(0);
Assert.assertArrayEquals(byteData, request.getElement().getTagFamilies(0).getTags(0).getBinaryData().toByteArray());
Assert.assertEquals(13, request.getElement().getTagFamilies(1).getTagsCount());
Assert.assertEquals(traceId, request.getElement().getTagFamilies(1).getTags(0).getStr().getValue());
Assert.assertEquals(latency, request.getElement().getTagFamilies(1).getTags(5).getInt().getValue());
Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(7).getNull(), NullValue.NULL_VALUE);
Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(9).getNull(), NullValue.NULL_VALUE);
Assert.assertEquals(queue, request.getElement().getTagFamilies(1).getTags(12).getStr().getValue());
} else {
Assert.fail();
}
}
@Test
public void performSingleWrite() throws Exception {
final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
final List<BanyandbStream.WriteRequest> writeRequestDelivered = new ArrayList<>();
// implement the fake service
final ServerServiceDefinition serviceImpl = createService(allRequestsDelivered, writeRequestDelivered);
serviceRegistry.addService(serviceImpl);
String segmentId = "1231.dfd.123123ssf";
String traceId = "trace_id-xxfff.111323";
String serviceId = "webapp_id";
String serviceInstanceId = "10.0.0.1_id";
String endpointId = "home_id";
int latency = 200;
int state = 1;
Instant now = Instant.now();
byte[] byteData = new byte[]{14};
String broker = "172.16.10.129:9092";
String topic = "topic_1";
String queue = "queue_2";
String httpStatusCode = "200";
String dbType = "SQL";
String dbInstance = "127.0.0.1:3306";
StreamWrite streamWrite = client.createStreamWrite("default", "sw", segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue((long) state)) // 1
.tag("service_id", Value.stringTagValue(serviceId)) // 2
.tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3
.tag("endpoint_id", Value.stringTagValue(endpointId)) // 4
.tag("duration", Value.longTagValue((long) latency)) // 5
.tag("http.method", Value.stringTagValue(null)) // 6
.tag("status_code", Value.stringTagValue(httpStatusCode)) // 7
.tag("db.type", Value.stringTagValue(dbType)) // 8
.tag("db.instance", Value.stringTagValue(dbInstance)) // 9
.tag("mq.broker", Value.stringTagValue(broker)) // 10
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
client.write(streamWrite);
if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
Assert.assertEquals(1, writeRequestDelivered.size());
final BanyandbStream.WriteRequest request = writeRequestDelivered.get(0);
Assert.assertArrayEquals(byteData, request.getElement().getTagFamilies(0).getTags(0).getBinaryData().toByteArray());
Assert.assertEquals(13, request.getElement().getTagFamilies(1).getTagsCount());
Assert.assertEquals(traceId, request.getElement().getTagFamilies(1).getTags(0).getStr().getValue());
Assert.assertEquals(latency, request.getElement().getTagFamilies(1).getTags(5).getInt().getValue());
Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(7).getNull(), NullValue.NULL_VALUE);
Assert.assertEquals(queue, request.getElement().getTagFamilies(1).getTags(12).getStr().getValue());
Assert.assertTrue(request.getMessageId() > 0);
} else {
Assert.fail();
}
}
@Test
public void testAutoRefreshSchema() throws Exception {
CountDownLatch allRequestsDelivered = new CountDownLatch(1);
List<BanyandbStream.WriteRequest> writeRequestDelivered = new ArrayList<>();
ServerServiceDefinition serviceImpl = createService(allRequestsDelivered, writeRequestDelivered);
serviceRegistry.addService(serviceImpl);
String segmentId = "1231.dfd.123123ssf";
String traceId = "trace_id-xxfff.111323";
String serviceId = "webapp_id";
String serviceInstanceId = "10.0.0.1_id";
String endpointId = "home_id";
int latency = 200;
int state = 1;
Instant now = Instant.now();
byte[] byteData = new byte[]{14};
String broker = "172.16.10.129:9092";
String topic = "topic_1";
String queue = "queue_2";
String httpStatusCode = "200";
String dbType = "SQL";
String dbInstance = "127.0.0.1:3306";
StreamWrite streamWrite = client.createStreamWrite("default", "sw", segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue((long) state)) // 1
.tag("service_id", Value.stringTagValue(serviceId)) // 2
.tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3
.tag("endpoint_id", Value.stringTagValue(endpointId)) // 4
.tag("duration", Value.longTagValue((long) latency)) // 5
.tag("http.method", Value.stringTagValue(null)) // 6
.tag("status_code", Value.stringTagValue(httpStatusCode)) // 7
.tag("db.type", Value.stringTagValue(dbType)) // 8
.tag("db.instance", Value.stringTagValue(dbInstance)) // 9
.tag("mq.broker", Value.stringTagValue(broker)) // 10
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
// update schema
Stream streamUpdate = stream.toBuilder()
.addTagFamily(TagFamilySpec.create("new_tag_family")
.addTagSpec(TagFamilySpec.TagSpec.newIntTag("new_tag"))
.build())
.build();
StreamMetadataRegistry streamMetadataRegistry = new StreamMetadataRegistry(checkNotNull(this.channel));
streamMetadataRegistry.update(streamUpdate);
streamBulkWriteProcessor.add(streamWrite);
if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
Assert.assertEquals(0, writeRequestDelivered.size());
} else {
Assert.fail();
}
// rewrite
serviceRegistry.removeService(serviceImpl);
allRequestsDelivered = new CountDownLatch(1);
writeRequestDelivered = new ArrayList<>();
serviceImpl = createService(allRequestsDelivered, writeRequestDelivered);
serviceRegistry.addService(serviceImpl);
now = Instant.now();
streamWrite = client.createStreamWrite("default", "sw", segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue((long) state)) // 1
.tag("service_id", Value.stringTagValue(serviceId)) // 2
.tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3
.tag("endpoint_id", Value.stringTagValue(endpointId)) // 4
.tag("duration", Value.longTagValue((long) latency)) // 5
.tag("http.method", Value.stringTagValue(null)) // 6
.tag("status_code", Value.stringTagValue(httpStatusCode)) // 7
.tag("db.type", Value.stringTagValue(dbType)) // 8
.tag("db.instance", Value.stringTagValue(dbInstance)) // 9
.tag("mq.broker", Value.stringTagValue(broker)) // 10
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
streamBulkWriteProcessor.add(streamWrite);
if (allRequestsDelivered.await(5, TimeUnit.SECONDS)) {
Assert.assertEquals(1, writeRequestDelivered.size());
final BanyandbStream.WriteRequest request = writeRequestDelivered.get(0);
Assert.assertArrayEquals(byteData, request.getElement().getTagFamilies(0).getTags(0).getBinaryData().toByteArray());
Assert.assertEquals(13, request.getElement().getTagFamilies(1).getTagsCount());
Assert.assertEquals(traceId, request.getElement().getTagFamilies(1).getTags(0).getStr().getValue());
Assert.assertEquals(latency, request.getElement().getTagFamilies(1).getTags(5).getInt().getValue());
Assert.assertEquals(request.getElement().getTagFamilies(1).getTags(7).getNull(), NullValue.NULL_VALUE);
Assert.assertEquals(queue, request.getElement().getTagFamilies(1).getTags(12).getStr().getValue());
Assert.assertTrue(request.getMessageId() > 0);
} else {
Assert.fail();
}
}
@SneakyThrows
private boolean checkSchemaExpired(BanyandbStream.WriteRequest request) {
Stream m = client.findStream(request.getMetadata().getGroup(), request.getMetadata().getName());
return request.getMetadata().getModRevision() != m.serialize().getMetadata().getModRevision();
}
private ServerServiceDefinition createService(final CountDownLatch allRequestsDelivered,
final List<BanyandbStream.WriteRequest> writeRequestDelivered) {
return new StreamServiceGrpc.StreamServiceImplBase() {
@Override
public StreamObserver<BanyandbStream.WriteRequest> write(StreamObserver<BanyandbStream.WriteResponse> responseObserver) {
return new StreamObserver<BanyandbStream.WriteRequest>() {
@Override
public void onNext(BanyandbStream.WriteRequest request) {
if (checkSchemaExpired(request)) {
responseObserver.onNext(
BanyandbStream.WriteResponse.newBuilder()
.setMetadata(request.getMetadata())
.setStatus(BanyandbModel.Status.STATUS_EXPIRED_SCHEMA)
.setMessageId(request.getMessageId())
.build());
} else {
writeRequestDelivered.add(request);
responseObserver.onNext(BanyandbStream.WriteResponse.newBuilder().setMessageId(request.getMessageId()).build());
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
allRequestsDelivered.countDown();
}
};
}
}.bindService();
}
}