Support multi groups (#54)
diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 41bc00b..dbd766c 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -71,6 +71,12 @@
distribution: 'temurin'
java-version: ${{ matrix.version }}
cache: 'maven'
+ - name: Login to ghcr
+ uses: docker/login-action@v1
+ with:
+ registry: ghcr.io
+ username: ${{ github.repository_owner }}
+ password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and Test
run: ./mvnw -q clean verify install
diff --git a/README.md b/README.md
index 955a4f3..17f6236 100644
--- a/README.md
+++ b/README.md
@@ -118,7 +118,7 @@
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=default, name=sw
-StreamQuery query = new StreamQuery("default", "sw",
+StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
// projection tags which are indexed
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
@@ -154,7 +154,7 @@
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=sw_metrics, name=service_instance_cpm_day
-MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day",
+MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metrics"), "service_instance_cpm_day",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
ImmutableSet.of("id", "scope", "service_id"),
ImmutableSet.of("total"));
diff --git a/pom.xml b/pom.xml
index 1f1d3bb..6288226 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,17 +82,15 @@
<!-- core lib dependency -->
<bytebuddy.version>1.10.19</bytebuddy.version>
<!-- grpc version should align with the Skywalking main repo -->
- <grpc.version>1.46.0</grpc.version>
- <protobuf.version>3.19.2</protobuf.version>
- <protoc.version>3.19.2</protoc.version>
- <gson.version>2.8.6</gson.version>
- <os-maven-plugin.version>1.6.2</os-maven-plugin.version>
+ <grpc.version>1.63.0</grpc.version>
+ <protoc.version>3.25.3</protoc.version>
+ <os-maven-plugin.version>1.7.1</os-maven-plugin.version>
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
- <netty.tcnative.version>2.0.51.Final</netty.tcnative.version>
+ <netty.tcnative.version>2.0.65.Final</netty.tcnative.version>
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
- <auto-value.version>1.9</auto-value.version>
- <testcontainers.version>1.16.3</testcontainers.version>
- <awaitility.version>4.2.0</awaitility.version>
+ <auto-value.version>1.10.4</auto-value.version>
+ <testcontainers.version>1.19.7</testcontainers.version>
+ <awaitility.version>4.2.1</awaitility.version>
<bufbuild.protoc-gen-validate.version>0.6.13</bufbuild.protoc-gen-validate.version>
<!-- necessary for Java 9+ -->
<org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index d2788c4..ed05aa1 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -24,7 +24,6 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidReferenceException;
@@ -39,7 +38,7 @@
/**
* Group of the current entity
*/
- protected final String group;
+ protected final List<String> groups;
/**
* Owner name of the current entity
*/
@@ -63,8 +62,8 @@
*/
protected AbstractCriteria criteria;
- public AbstractQuery(String group, String name, TimestampRange timestampRange, Set<String> tagProjections) {
- this.group = group;
+ public AbstractQuery(List<String> groups, String name, TimestampRange timestampRange, Set<String> tagProjections) {
+ this.groups = groups;
this.name = name;
this.timestampRange = timestampRange;
this.conditions = new ArrayList<>(10);
@@ -107,13 +106,6 @@
*/
abstract T build(MetadataCache.EntityMetadata entityMetadata) throws BanyanDBException;
- protected BanyandbCommon.Metadata buildMetadata() {
- return BanyandbCommon.Metadata.newBuilder()
- .setGroup(group)
- .setName(name)
- .build();
- }
-
protected Optional<BanyandbModel.Criteria> buildCriteria() {
if (criteria != null) {
return Optional.of(criteria.build());
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index c1d4e3f..2ec4d65 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -348,11 +348,18 @@
public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException {
checkState(this.streamServiceStub != null, "stream service is null");
- final BanyandbStream.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
- this.streamServiceBlockingStub
- .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
- .query(streamQuery.build(this.metadataCache.findMetadata(streamQuery.group, streamQuery.name))));
- return new StreamQueryResponse(response);
+ for (String group : streamQuery.groups) {
+ MetadataCache.EntityMetadata em = this.metadataCache.findMetadata(group, streamQuery.name);
+ if (em != null) {
+ final BanyandbStream.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.streamServiceBlockingStub
+ .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
+ .query(streamQuery.build(em)));
+ return new StreamQueryResponse(response);
+ }
+
+ }
+ throw new RuntimeException("No metadata found for the query");
}
/**
@@ -379,13 +386,18 @@
*/
public MeasureQueryResponse query(MeasureQuery measureQuery) throws BanyanDBException {
checkState(this.streamServiceStub != null, "measure service is null");
-
- final BanyandbMeasure.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
- this.measureServiceBlockingStub
- .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
- .query(measureQuery.build(this.metadataCache.findMetadata(measureQuery.group, measureQuery.name))));
- return new MeasureQueryResponse(response);
- }
+ for (String group : measureQuery.groups) {
+ MetadataCache.EntityMetadata em = this.metadataCache.findMetadata(group, measureQuery.name);
+ if (em != null) {
+ final BanyandbMeasure.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
+ this.measureServiceBlockingStub
+ .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
+ .query(measureQuery.build(em)));
+ return new MeasureQueryResponse(response);
+ }
+ }
+ throw new RuntimeException("No metadata found for the query");
+ }
/**
* Define a new group and attach to the current client.
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
index 63e83b7..0ab2d13 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
@@ -27,6 +27,7 @@
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
+import java.util.List;
import java.util.Set;
/**
@@ -49,12 +50,12 @@
private OrderBy orderBy;
- public MeasureQuery(final String group, final String name, final Set<String> tagProjections, final Set<String> fieldProjections) {
- this(group, name, null, tagProjections, fieldProjections);
+ public MeasureQuery(final List<String> groups, final String name, final Set<String> tagProjections, final Set<String> fieldProjections) {
+ this(groups, name, null, tagProjections, fieldProjections);
}
- public MeasureQuery(final String group, final String name, final TimestampRange timestampRange, final Set<String> tagProjections, final Set<String> fieldProjections) {
- super(group, name, timestampRange, tagProjections);
+ public MeasureQuery(final List<String> groups, final String name, final TimestampRange timestampRange, final Set<String> tagProjections, final Set<String> fieldProjections) {
+ super(groups, name, timestampRange, tagProjections);
this.fieldProjections = fieldProjections;
}
@@ -149,7 +150,8 @@
throw new IllegalArgumentException("entity metadata is null");
}
final BanyandbMeasure.QueryRequest.Builder builder = BanyandbMeasure.QueryRequest.newBuilder();
- builder.setMetadata(buildMetadata());
+ builder.setName(this.name);
+ builder.addAllGroups(this.groups);
if (timestampRange != null) {
builder.setTimeRange(timestampRange.build());
} else {
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
index 1c453f7..c553356 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.banyandb.v1.client;
+import java.util.List;
import java.util.Set;
import lombok.Setter;
@@ -43,14 +44,14 @@
*/
private OrderBy orderBy;
- public StreamQuery(final String group, final String name, final TimestampRange timestampRange, final Set<String> projections) {
- super(group, name, timestampRange, projections);
+ public StreamQuery(final List<String> groups, final String name, final TimestampRange timestampRange, final Set<String> projections) {
+ super(groups, name, timestampRange, projections);
this.offset = 0;
this.limit = 20;
}
- public StreamQuery(final String group, final String name, final Set<String> projections) {
- this(group, name, null, projections);
+ public StreamQuery(final List<String> groups, final String name, final Set<String> projections) {
+ this(groups, name, null, projections);
}
@Override
@@ -68,8 +69,9 @@
if (entityMetadata == null) {
throw new IllegalArgumentException("entity metadata is null");
}
- final BanyandbStream.QueryRequest.Builder builder = BanyandbStream.QueryRequest.newBuilder()
- .setMetadata(buildMetadata());
+ final BanyandbStream.QueryRequest.Builder builder = BanyandbStream.QueryRequest.newBuilder();
+ builder.setName(this.name);
+ builder.addAllGroups(this.groups);
if (timestampRange != null) {
builder.setTimeRange(timestampRange.build());
}
diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
index 9330330..8f40d91 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java
@@ -20,7 +20,6 @@
import com.google.common.base.Preconditions;
import lombok.Setter;
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
@@ -29,7 +28,7 @@
@Setter
public class TopNQuery {
- private final String group;
+ private final List<String> groups;
private final String name;
private final TimestampRange timestampRange;
private final int number;
@@ -40,10 +39,10 @@
*/
private List<PairQueryCondition<?>> conditions;
- public TopNQuery(String group, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
+ public TopNQuery(List<String> groups, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
Preconditions.checkArgument(sort != AbstractQuery.Sort.UNSPECIFIED);
Preconditions.checkArgument(number > 0);
- this.group = group;
+ this.groups = groups;
this.name = name;
this.timestampRange = timestampRange;
this.number = number;
@@ -52,7 +51,8 @@
BanyandbMeasure.TopNRequest build() throws BanyanDBException {
BanyandbMeasure.TopNRequest.Builder bld = BanyandbMeasure.TopNRequest.newBuilder()
- .setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
+ .setName(name)
+ .addAllGroups(groups)
.setTimeRange(timestampRange.build())
.setTopN(number)
.setFieldValueSort(AbstractQuery.Sort.DESC == sort ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);
diff --git a/src/main/proto/banyandb/v1/banyandb-measure.proto b/src/main/proto/banyandb/v1/banyandb-measure.proto
index 574e68b..fc80f88 100644
--- a/src/main/proto/banyandb/v1/banyandb-measure.proto
+++ b/src/main/proto/banyandb/v1/banyandb-measure.proto
@@ -47,15 +47,18 @@
}
// QueryRequest is the request contract for query.
+// QueryRequest is the request contract for query.
message QueryRequest {
- // metadata is required
- common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+ // groups indicate where the data points are stored.
+ repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
+ // name is the identity of a measure.
+ string name = 2 [(validate.rules).string.min_len = 1];
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
- model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
+ model.v1.TimeRange time_range = 3 [(validate.rules).message.required = true];
// tag_families are indexed.
model.v1.Criteria criteria = 4;
// tag_projection can be used to select tags of the data points in the response
- model.v1.TagProjection tag_projection = 5 [(validate.rules).message.required = true];
+ model.v1.TagProjection tag_projection = 5;
message FieldProjection {
repeated string names = 1;
}
@@ -121,19 +124,21 @@
// TopNRequest is the request contract for query.
message TopNRequest {
- // metadata is required
- common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+ // groups indicate where the data points are stored.
+ repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
+ // name is the identity of a measure.
+ string name = 2 [(validate.rules).string.min_len = 1];
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
- model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
+ model.v1.TimeRange time_range = 3 [(validate.rules).message.required = true];
// top_n set the how many items should be returned in each list.
- int32 top_n = 3 [(validate.rules).int32.gt = 0];
+ int32 top_n = 4 [(validate.rules).int32.gt = 0];
// agg aggregates lists grouped by field names in the time_range
// TODO validate enum defined_only
- model.v1.AggregationFunction agg = 4;
+ model.v1.AggregationFunction agg = 5;
// criteria select counters. Only equals are acceptable.
- repeated model.v1.Condition conditions = 5;
+ repeated model.v1.Condition conditions = 6;
// field_value_sort indicates how to sort fields
- model.v1.Sort field_value_sort = 6;
+ model.v1.Sort field_value_sort = 7;
}
//DataPointValue is the data point for writing. It only contains values.
diff --git a/src/main/proto/banyandb/v1/banyandb-stream.proto b/src/main/proto/banyandb/v1/banyandb-stream.proto
index 29e73cf..cd8e0a2 100644
--- a/src/main/proto/banyandb/v1/banyandb-stream.proto
+++ b/src/main/proto/banyandb/v1/banyandb-stream.proto
@@ -53,23 +53,25 @@
// QueryRequest is the request contract for query.
message QueryRequest {
- // metadata is required
- common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
+ // groups indicate where the elements are stored.
+ repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
+ // name is the identity of a stream.
+ string name = 2 [(validate.rules).string.min_len = 1];
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
// In the context of stream, it represents the range of the `startTime` for spans/segments,
// while in the context of Log, it means the range of the timestamp(s) for logs.
// it is always recommended to specify time range for performance reason
- model.v1.TimeRange time_range = 2;
+ model.v1.TimeRange time_range = 3;
// offset is used to support pagination, together with the following limit
- uint32 offset = 3;
+ uint32 offset = 4;
// limit is used to impose a boundary on the number of records being returned
- uint32 limit = 4;
+ uint32 limit = 5;
// order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
- model.v1.QueryOrder order_by = 5;
+ model.v1.QueryOrder order_by = 6;
// tag_families are indexed.
- model.v1.Criteria criteria = 6;
+ model.v1.Criteria criteria = 7;
// projection can be used to select the key names of the element in the response
- model.v1.TagProjection projection = 7 [(validate.rules).message.required = true];
+ model.v1.TagProjection projection = 8 [(validate.rules).message.required = true];
}
message ElementValue {
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
index 0d9d2e4..f4b44b5 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/AbstractBanyanDBClientTest.java
@@ -19,11 +19,11 @@
package org.apache.skywalking.banyandb.v1.client;
import io.grpc.BindableService;
+import io.grpc.ForwardingServerBuilder;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
-import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.util.MutableHandlerRegistry;
@@ -359,7 +359,7 @@
}
protected interface SetupFunction {
- void apply(AbstractServerImplBuilder<?> builder);
+ void apply(ForwardingServerBuilder<?> builder);
}
protected SetupFunction bindStreamRegistry() {
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
index cc93f34..c71f1a5 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureQueryTest.java
@@ -81,7 +81,7 @@
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
- MeasureQuery query = new MeasureQuery("sw_metric", "service_cpm_minute",
+ MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metric"), "service_cpm_minute",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
ImmutableSet.of("entity_id"),
ImmutableSet.of("total"));
@@ -94,8 +94,8 @@
final BanyandbMeasure.QueryRequest request = requestCaptor.getValue();
// assert metadata
- Assert.assertEquals("service_cpm_minute", request.getMetadata().getName());
- Assert.assertEquals("sw_metric", request.getMetadata().getGroup());
+ Assert.assertEquals("service_cpm_minute", request.getName());
+ Assert.assertEquals("sw_metric", request.getGroups(0));
// assert timeRange, both seconds and the nanos
Assert.assertEquals(begin.toEpochMilli() / 1000, request.getTimeRange().getBegin().getSeconds());
Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
index 1a737fd..84177da 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamQueryTest.java
@@ -92,7 +92,7 @@
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
- StreamQuery query = new StreamQuery("default", "sw",
+ StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
// search for all states
@@ -104,8 +104,8 @@
final BanyandbStream.QueryRequest request = requestCaptor.getValue();
// assert metadata
- Assert.assertEquals("sw", request.getMetadata().getName());
- Assert.assertEquals("default", request.getMetadata().getGroup());
+ Assert.assertEquals("sw", request.getName());
+ Assert.assertEquals("default", request.getGroups(0));
// assert timeRange, both seconds and the nanos
Assert.assertEquals(begin.toEpochMilli() / 1000, request.getTimeRange().getBegin().getSeconds());
Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(begin.toEpochMilli() % 1000), request.getTimeRange().getBegin().getNanos());
@@ -139,7 +139,7 @@
long minDuration = 10;
long maxDuration = 100;
- StreamQuery query = new StreamQuery("default", "sw",
+ StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
// search for the successful states
@@ -157,8 +157,8 @@
verify(streamQueryServiceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
final BanyandbStream.QueryRequest request = requestCaptor.getValue();
// assert metadata
- Assert.assertEquals("sw", request.getMetadata().getName());
- Assert.assertEquals("default", request.getMetadata().getGroup());
+ Assert.assertEquals("sw", request.getName());
+ Assert.assertEquals("default", request.getGroups(0));
// assert timeRange
Assert.assertEquals(begin.getEpochSecond(), request.getTimeRange().getBegin().getSeconds());
Assert.assertEquals(end.getEpochSecond(), request.getTimeRange().getEnd().getSeconds());
@@ -286,7 +286,7 @@
ArgumentCaptor<BanyandbStream.QueryRequest> requestCaptor = ArgumentCaptor.forClass(BanyandbStream.QueryRequest.class);
String traceId = "1111.222.333";
- StreamQuery query = new StreamQuery("default", "sw", ImmutableSet.of("state", "start_time", "duration", "trace_id", "data_binary"));
+ StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw", ImmutableSet.of("state", "start_time", "duration", "trace_id", "data_binary"));
query.and(PairQueryCondition.StringQueryCondition.eq("trace_id", traceId));
client.query(query);
@@ -294,8 +294,8 @@
verify(streamQueryServiceImpl).query(requestCaptor.capture(), ArgumentMatchers.any());
final BanyandbStream.QueryRequest request = requestCaptor.getValue();
// assert metadata
- Assert.assertEquals("sw", request.getMetadata().getName());
- Assert.assertEquals("default", request.getMetadata().getGroup());
+ Assert.assertEquals("sw", request.getName());
+ Assert.assertEquals("default", request.getGroups(0));
Assert.assertEquals("condition {\n" +
" name: \"trace_id\"\n" +
" op: BINARY_OP_EQ\n" +
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
index e1e7bdd..a12b12f 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientTestCI.java
@@ -28,9 +28,9 @@
@Slf4j
public class BanyanDBClientTestCI {
- private static final String REGISTRY = "docker.io";
+ private static final String REGISTRY = "ghcr.io";
private static final String IMAGE_NAME = "apache/skywalking-banyandb";
- private static final String TAG = "v0.0.0-dev";
+ private static final String TAG = "395bbf2ff70c4c94b60ea610e9a5176545c4ae5b";
private static final String IMAGE = REGISTRY + "/" + IMAGE_NAME + ":" + TAG;
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index faa80d6..deb61f3 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
@@ -80,7 +81,7 @@
});
f.get(10, TimeUnit.SECONDS);
- MeasureQuery query = new MeasureQuery("sw_metric", "service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1, ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("entity_id"), // tags
+ MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metric"), "service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1, ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("entity_id"), // tags
ImmutableSet.of("total")); // fields
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index 7091221..550c29f 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.banyandb.v1.client;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
import org.apache.skywalking.banyandb.v1.client.metadata.Catalog;
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
@@ -129,7 +130,7 @@
});
f.get(10, TimeUnit.SECONDS);
- StreamQuery query = new StreamQuery("default", "sw", ImmutableSet.of("state", "duration", "trace_id", "data_binary"));
+ StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw", ImmutableSet.of("state", "duration", "trace_id", "data_binary"));
query.and(PairQueryCondition.StringQueryCondition.eq("trace_id", traceId));
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {