commit | 689102ee46f03d9dc07fdf23277e62484744b772 | [log] [tgz] |
---|---|---|
author | 吴晟 Wu Sheng <wu.sheng@foxmail.com> | Tue May 24 08:28:28 2022 +0800 |
committer | GitHub <noreply@github.com> | Tue May 24 08:28:28 2022 +0800 |
tree | 3c153140767a2284473cbbfa25cbdb9e8aa6109b | |
parent | 943f43219fc76d4b364906bec88ec20e2678631e [diff] |
Bump up server version for IT (#16)
The client implement for SkyWalking BanyanDB in Java.
Create a BanyanDBClient
with host, port and then use connect()
to establish a connection.
// use `default` group client = new BanyanDBClient("127.0.0.1", 17912); // to send any request, a connection to the server must be estabilished client.connect();
Besides, you may pass a customized options while building a BanyanDBClient
. Supported options are listed below,
Option | Description | Default |
---|---|---|
maxInboundMessageSize | Max inbound message size | 1024 * 1024 * 50 (~50MB) |
deadline | Threshold of gRPC blocking query, unit is second | 30 (seconds) |
refreshInterval | Refresh interval for the gRPC channel, unit is second | 30 (seconds) |
forceReconnectionThreshold | Threshold of force gRPC reconnection if network issue is encountered | 1 |
forceTLS | Force use TLS for gRPC | false |
sslTrustCAPath | SSL: Trusted CA Path | |
sslCertChainPath | SSL: Cert Chain Path | |
sslKeyPath | SSL: Cert Key Path |
Then we may define a stream with customized configurations. The following example uses SegmentRecord
in SkyWalking OAP as an illustration,
// build a stream default(group)/sw(name) with 2 shards and ttl equals to 30 days Stream s = Stream.create("default", "sw") // set entities .setEntityRelativeTags("service_id", "service_instance_id", "is_error") // add a tag family "data" .addTagFamily(TagFamilySpec.create("data") .addTagSpec(TagFamilySpec.TagSpec.newBinaryTag("data_binary")) .build()) // add a tag family "searchable" .addTagFamily(TagFamilySpec.create("searchable") // create a string tag "trace_id" .addTagSpec(TagFamilySpec.TagSpec.newStringTag("trace_id")) .addTagSpec(TagFamilySpec.TagSpec.newIntTag("is_error")) .addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_id")) .build()) .build(); client.define(s);
For the last line in the code block, a simple API (i.e. BanyanDBClient.define(Stream)
) is used to define the schema of Stream
. The same works for Measure
which will be demonstrated later.
Measure
can also be defined directly with BanyanDBClient
,
// create a new measure schema with an additional interval // the interval is used to specify how frequently to send a data point Measure m = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1)) // set entity .setEntityRelativeTags("entity_id") // define a tag family "default" .addTagFamily(TagFamilySpec.create("default") .addTagSpec(TagFamilySpec.TagSpec.newIDTag("id")) .addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id")) .build()) // define field specs // compressMethod and encodingMethod can be specified .addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build()) .addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build()) .build(); // define a measure, as we've mentioned above client.define(m);
For more APIs usage, refer to test cases and API docs.
Construct a StreamQuery
instance with given time-range and other conditions.
Note: time-range is left-inclusive and right-exclusive.
For example,
// [begin, end) = [ now - 15min, now ) Instant end = Instant.now(); Instant begin = end.minus(15, ChronoUnit.MINUTES); // with stream schema, group=default, name=sw StreamQuery query = new StreamQuery("default", "sw", new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()), // projection tags which are indexed ImmutableSet.of("state", "start_time", "duration", "trace_id")); // search for all states query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L)); // set order by condition query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC)); // set projection for un-indexed tags query.setDataProjections(ImmutableSet.of("data_binary")); // send the query request client.query(query);
After response is returned, elements
can be fetched,
StreamQueryResponse resp = client.queryStreams(query); List<RowEntity> entities = resp.getElements();
Every item RowEntity
in the list contains elementId
, timestamp
and tag families requested.
The StreamQueryResponse
, RowEntity
, TagFamily
and Tag
(i.e. TagAndValue
) forms a hierarchical structure, where the order of the tag families and containing tags, i.e. indexes of these objects in the List, follow the order specified in the projection condition we've used in the request.
For Measure
, it is similar to the Stream
,
// [begin, end) = [ now - 15min, now ) Instant end = Instant.now(); Instant begin = end.minus(15, ChronoUnit.MINUTES); // with stream schema, group=sw_metrics, name=service_instance_cpm_day MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day", new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()), ImmutableSet.of("id", "scope", "service_id"), ImmutableSet.of("total")); // query max "total" with group by tag "service_id" query.maxBy("total", ImmutableSet.of("service_id")); // use conditions query.appendCondition(PairQueryCondition.StringQueryCondition.eq("default", "service_id", "abc")); // send the query request client.query(query);
After response is returned, dataPoints
can be extracted,
MeasureQueryResponse resp = client.query(query); List<DataPoint> dataPointList = resp.getDataPoints();
Measure API supports TopN
/BottomN
search. The results or (grouped-)results are ordered by the given field
,
MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day", new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()), ImmutableSet.of("id", "scope", "service_id"), ImmutableSet.of("total")); query.topN(5, "total"); // bottomN
Besides, limit
and offset
are used to support pagination. Tag
-based sort can also be done to the final results,
query.limit(5); query.offset(1); query.orderBy("service_id", Sort.DESC);
Since grpc bidi streaming is used for write protocol, build a StreamBulkWriteProcessor
which would handle back-pressure for you. Adjust maxBulkSize
, flushInterval
and concurrency
of the consumer in different scenarios to meet requirements.
// build a StreamBulkWriteProcessor from client StreamBulkWriteProcessor streamBulkWriteProcessor = client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);
The StreamBulkWriteProcessor
is thread-safe and thus can be used across threads. We highly recommend you to reuse it.
The procedure of constructing StreamWrite
entity must comply with the schema of the stream, e.g. the order of tags must exactly be the same with that defined in the schema. And the non-existing tags must be fulfilled (with NullValue) instead of compacting all non-null tag values.
StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId, now.toEpochMilli()) .tag("data_binary", Value.binaryTagValue(byteData)) .tag("trace_id", Value.stringTagValue(traceId)) // 0 .tag("state", Value.longTagValue(state)) // 1 .tag("service_id", Value.stringTagValue(serviceId)) // 2 .tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3 .tag("endpoint_id", Value.stringTagValue(endpointId)) // 4 .tag("duration", Value.longTagValue(latency)) // 5 .tag("http.method", Value.stringTagValue(null)) // 6 .tag("status_code", Value.stringTagValue(httpStatusCode)) // 7 .tag("db.type", Value.stringTagValue(dbType)) // 8 .tag("db.instance", Value.stringTagValue(dbInstance)) // 9 .tag("mq.broker", Value.stringTagValue(broker)) // 10 .tag("mq.topic", Value.stringTagValue(topic)) // 11 .tag("mq.queue", Value.stringTagValue(queue)); // 12 streamBulkWriteProcessor.add(streamWrite);
The writing procedure for Measure
is similar to the above described process and leverages the bidirectional streaming of gRPC,
// build a MeasureBulkWriteProcessor from client MeasureBulkWriteProcessor bulkWriteProcessor = client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency);
A BulkWriteProcessor
is created by calling buildMeasureWriteProcessor
. Then build the MeasureWrite
object and send with bulk processor,
Instant now = Instant.now(); MeasureWrite measureWrite = new MeasureWrite("sw_metric", "service_cpm_minute", now.toEpochMilli()); measureWrite.tag("id", TagAndValue.idTagValue("1")) .tag("entity_id", TagAndValue.stringTagValue("entity_1")) .field("total", TagAndValue.longFieldValue(100)) .field("value", TagAndValue.longFieldValue(1)); measureBulkWriteProcessor.add(measureWrite);
Property APIs are used to store key-value pairs.
save
will always succeed whenever the property exists or not. The old value will be overwritten if already existed, otherwise a new value will be set.
Property property = Property.create("default", "sw", "ui_template") .addTag(TagAndValue.newStringTag("name", "hello")) .build(); this.client.save(property);
Property can be queried via Client.findProperty
,
Property gotProperty = this.client.findProperty("default", "sw", "ui_template");
Property can be deleted by calling Client.deleteProperty
,
this.client.deleteProperty("default", "sw", "ui_template");
./mvnw clean package
This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please follow the REPORTING GUIDELINES to report unacceptable behavior.
dev-subscribe@skywalking.apache.org
, follow the reply to subscribe the mail list.Request to join SkyWalking slack
mail to the mail list(dev@skywalking.apache.org
), we will invite you in.