The client implementation for SkyWalking BanyanDB in Java

Clone this repo:

Branches

Tags

  1. 14e2e4f Fix year in NOTICE file (#17) by 吴晟 Wu Sheng · 4 months ago main
  2. 689102e Bump up server version for IT (#16) by 吴晟 Wu Sheng · 4 months ago
  3. 943f432 Prepare release v0.1.0 (#15) by Jiajing LU · 4 months ago
  4. 107745c abandon shaded netty lib (#14) by Jiajing LU · 5 months ago
  5. 00e81de Add topN impl (#13) by Jiajing LU · 5 months ago

BanyanDB Java Client

The client implement for SkyWalking BanyanDB in Java.

GitHub stars Twitter Follow

CI/IT Tests

Usage

Create a client

Create a BanyanDBClient with host, port and then use connect() to establish a connection.

// use `default` group
client = new BanyanDBClient("127.0.0.1", 17912);
// to send any request, a connection to the server must be estabilished
client.connect();

Besides, you may pass a customized options while building a BanyanDBClient. Supported options are listed below,

OptionDescriptionDefault
maxInboundMessageSizeMax inbound message size1024 * 1024 * 50 (~50MB)
deadlineThreshold of gRPC blocking query, unit is second30 (seconds)
refreshIntervalRefresh interval for the gRPC channel, unit is second30 (seconds)
forceReconnectionThresholdThreshold of force gRPC reconnection if network issue is encountered1
forceTLSForce use TLS for gRPCfalse
sslTrustCAPathSSL: Trusted CA Path
sslCertChainPathSSL: Cert Chain Path
sslKeyPathSSL: Cert Key Path

Schema Management

Stream and index rules

Then we may define a stream with customized configurations. The following example uses SegmentRecord in SkyWalking OAP as an illustration,

// build a stream default(group)/sw(name) with 2 shards and ttl equals to 30 days
Stream s = Stream.create("default", "sw")
        // set entities
        .setEntityRelativeTags("service_id", "service_instance_id", "is_error")
        // add a tag family "data"
        .addTagFamily(TagFamilySpec.create("data")
            .addTagSpec(TagFamilySpec.TagSpec.newBinaryTag("data_binary"))
            .build())
        // add a tag family "searchable"
        .addTagFamily(TagFamilySpec.create("searchable")
            // create a string tag "trace_id"
            .addTagSpec(TagFamilySpec.TagSpec.newStringTag("trace_id"))
            .addTagSpec(TagFamilySpec.TagSpec.newIntTag("is_error"))
            .addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_id"))
            .build())
        .build();
client.define(s);

For the last line in the code block, a simple API (i.e. BanyanDBClient.define(Stream)) is used to define the schema of Stream. The same works for Measure which will be demonstrated later.

Measure and index rules

Measure can also be defined directly with BanyanDBClient,

// create a new measure schema with an additional interval
// the interval is used to specify how frequently to send a data point
Measure m = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1))
        // set entity
        .setEntityRelativeTags("entity_id")
        // define a tag family "default"
        .addTagFamily(TagFamilySpec.create("default")
            .addTagSpec(TagFamilySpec.TagSpec.newIDTag("id"))
            .addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id"))
            .build())
        // define field specs
        // compressMethod and encodingMethod can be specified
        .addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build())
        .addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build())
        .build();
// define a measure, as we've mentioned above
client.define(m);

For more APIs usage, refer to test cases and API docs.

Query

Stream

Construct a StreamQuery instance with given time-range and other conditions.

Note: time-range is left-inclusive and right-exclusive.

For example,

// [begin, end) = [ now - 15min, now )
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=default, name=sw
StreamQuery query = new StreamQuery("default", "sw",
        new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
        // projection tags which are indexed
        ImmutableSet.of("state", "start_time", "duration", "trace_id"));
// search for all states
query.appendCondition(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L));
// set order by condition
query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
// set projection for un-indexed tags
query.setDataProjections(ImmutableSet.of("data_binary"));
// send the query request
client.query(query);

After response is returned, elements can be fetched,

StreamQueryResponse resp = client.queryStreams(query);
List<RowEntity> entities = resp.getElements();

Every item RowEntity in the list contains elementId, timestamp and tag families requested.

The StreamQueryResponse, RowEntity, TagFamily and Tag (i.e. TagAndValue) forms a hierarchical structure, where the order of the tag families and containing tags, i.e. indexes of these objects in the List, follow the order specified in the projection condition we've used in the request.

Measure

For Measure, it is similar to the Stream,

// [begin, end) = [ now - 15min, now )
Instant end = Instant.now();
Instant begin = end.minus(15, ChronoUnit.MINUTES);
// with stream schema, group=sw_metrics, name=service_instance_cpm_day
MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day",
    new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
    ImmutableSet.of("id", "scope", "service_id"),
    ImmutableSet.of("total"));
// query max "total" with group by tag "service_id"
query.maxBy("total", ImmutableSet.of("service_id"));
// use conditions
query.appendCondition(PairQueryCondition.StringQueryCondition.eq("default", "service_id", "abc"));
// send the query request
client.query(query);

After response is returned, dataPoints can be extracted,

MeasureQueryResponse resp = client.query(query);
List<DataPoint> dataPointList = resp.getDataPoints();

Measure API supports TopN/BottomN search. The results or (grouped-)results are ordered by the given field,

MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day",
        new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
        ImmutableSet.of("id", "scope", "service_id"),
        ImmutableSet.of("total"));
query.topN(5, "total"); // bottomN

Besides, limit and offset are used to support pagination. Tag-based sort can also be done to the final results,

query.limit(5);
query.offset(1);
query.orderBy("service_id", Sort.DESC);

Write

Stream

Since grpc bidi streaming is used for write protocol, build a StreamBulkWriteProcessor which would handle back-pressure for you. Adjust maxBulkSize, flushInterval and concurrency of the consumer in different scenarios to meet requirements.

// build a StreamBulkWriteProcessor from client
StreamBulkWriteProcessor streamBulkWriteProcessor = client.buildStreamWriteProcessor(maxBulkSize, flushInterval, concurrency);

The StreamBulkWriteProcessor is thread-safe and thus can be used across threads. We highly recommend you to reuse it.

The procedure of constructing StreamWrite entity must comply with the schema of the stream, e.g. the order of tags must exactly be the same with that defined in the schema. And the non-existing tags must be fulfilled (with NullValue) instead of compacting all non-null tag values.

StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId, now.toEpochMilli())
    .tag("data_binary", Value.binaryTagValue(byteData))
    .tag("trace_id", Value.stringTagValue(traceId)) // 0
    .tag("state", Value.longTagValue(state)) // 1
    .tag("service_id", Value.stringTagValue(serviceId)) // 2
    .tag("service_instance_id", Value.stringTagValue(serviceInstanceId)) // 3
    .tag("endpoint_id", Value.stringTagValue(endpointId)) // 4
    .tag("duration", Value.longTagValue(latency)) // 5
    .tag("http.method", Value.stringTagValue(null)) // 6
    .tag("status_code", Value.stringTagValue(httpStatusCode)) // 7
    .tag("db.type", Value.stringTagValue(dbType)) // 8
    .tag("db.instance", Value.stringTagValue(dbInstance)) // 9
    .tag("mq.broker", Value.stringTagValue(broker)) // 10
    .tag("mq.topic", Value.stringTagValue(topic)) // 11
    .tag("mq.queue", Value.stringTagValue(queue)); // 12

streamBulkWriteProcessor.add(streamWrite);

Measure

The writing procedure for Measure is similar to the above described process and leverages the bidirectional streaming of gRPC,

// build a MeasureBulkWriteProcessor from client
MeasureBulkWriteProcessor bulkWriteProcessor = client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency);

A BulkWriteProcessor is created by calling buildMeasureWriteProcessor. Then build the MeasureWrite object and send with bulk processor,

Instant now = Instant.now();
MeasureWrite measureWrite = new MeasureWrite("sw_metric", "service_cpm_minute", now.toEpochMilli());
    measureWrite.tag("id", TagAndValue.idTagValue("1"))
    .tag("entity_id", TagAndValue.stringTagValue("entity_1"))
    .field("total", TagAndValue.longFieldValue(100))
    .field("value", TagAndValue.longFieldValue(1));

measureBulkWriteProcessor.add(measureWrite);

Property APIs

Property APIs are used to store key-value pairs.

Create/Update

save will always succeed whenever the property exists or not. The old value will be overwritten if already existed, otherwise a new value will be set.

Property property = Property.create("default", "sw", "ui_template")
    .addTag(TagAndValue.newStringTag("name", "hello"))
    .build();
this.client.save(property);

Query

Property can be queried via Client.findProperty,

Property gotProperty = this.client.findProperty("default", "sw", "ui_template");

Delete

Property can be deleted by calling Client.deleteProperty,

this.client.deleteProperty("default", "sw", "ui_template");

Compiling project

./mvnw clean package

Code of conduct

This project adheres to the Contributor Covenant code of conduct. By participating, you are expected to uphold this code. Please follow the REPORTING GUIDELINES to report unacceptable behavior.

Contact Us

  • Mail list: dev@skywalking.apache.org. Mail to dev-subscribe@skywalking.apache.org, follow the reply to subscribe the mail list.
  • Send Request to join SkyWalking slack mail to the mail list(dev@skywalking.apache.org), we will invite you in.
  • Twitter, ASFSkyWalking
  • QQ Group: 901167865(Recommended), 392443393
  • bilibili B站 视频

License

Apache 2.0 License.