| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.core.zipkin; |
| |
| import com.google.gson.Gson; |
| import com.google.gson.JsonElement; |
| import com.google.gson.JsonObject; |
| import lombok.Getter; |
| import lombok.Setter; |
| import org.apache.skywalking.oap.server.core.Const; |
| import org.apache.skywalking.oap.server.core.analysis.Stream; |
| import org.apache.skywalking.oap.server.core.analysis.record.Record; |
| import org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor; |
| import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; |
| import org.apache.skywalking.oap.server.core.storage.StorageID; |
| import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB; |
| import org.apache.skywalking.oap.server.core.storage.annotation.Column; |
| import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch; |
| import org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase; |
| import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset; |
| import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity; |
| import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage; |
| import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; |
| import org.apache.skywalking.oap.server.library.util.BooleanUtils; |
| import org.apache.skywalking.oap.server.library.util.StringUtil; |
| import zipkin2.Endpoint; |
| import zipkin2.Span; |
| |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.apache.skywalking.oap.server.core.storage.StorageData.TIME_BUCKET; |
| |
| @SuperDataset |
| @Stream(name = ZipkinSpanRecord.INDEX_NAME, scopeId = DefaultScopeDefine.ZIPKIN_SPAN, builder = ZipkinSpanRecord.Builder.class, processor = RecordStreamProcessor.class) |
| @SQLDatabase.ExtraColumn4AdditionalEntity(additionalTable = ZipkinSpanRecord.ADDITIONAL_QUERY_TABLE, parentColumn = TIME_BUCKET) |
| @BanyanDB.TimestampColumn(ZipkinSpanRecord.TIMESTAMP_MILLIS) |
| public class ZipkinSpanRecord extends Record { |
| private static final Gson GSON = new Gson(); |
| public static final int QUERY_LENGTH = 256; |
| public static final String INDEX_NAME = "zipkin_span"; |
| public static final String ADDITIONAL_QUERY_TABLE = "zipkin_query"; |
| public static final String TRACE_ID = "trace_id"; |
| public static final String SPAN_ID = "span_id"; |
| public static final String PARENT_ID = "parent_id"; |
| public static final String NAME = "name"; |
| public static final String DURATION = "duration"; |
| public static final String KIND = "kind"; |
| public static final String TIMESTAMP_MILLIS = "timestamp_millis"; |
| public static final String TIMESTAMP = "timestamp"; |
| public static final String LOCAL_ENDPOINT_SERVICE_NAME = "local_endpoint_service_name"; |
| public static final String LOCAL_ENDPOINT_IPV4 = "local_endpoint_ipv4"; |
| public static final String LOCAL_ENDPOINT_IPV6 = "local_endpoint_ipv6"; |
| public static final String LOCAL_ENDPOINT_PORT = "local_endpoint_port"; |
| public static final String REMOTE_ENDPOINT_SERVICE_NAME = "remote_endpoint_service_name"; |
| public static final String REMOTE_ENDPOINT_IPV4 = "remote_endpoint_ipv4"; |
| public static final String REMOTE_ENDPOINT_IPV6 = "remote_endpoint_ipv6"; |
| public static final String REMOTE_ENDPOINT_PORT = "remote_endpoint_port"; |
| public static final String ANNOTATIONS = "annotations"; |
| public static final String TAGS = "tags"; |
| public static final String DEBUG = "debug"; |
| public static final String SHARED = "shared"; |
| public static final String QUERY = "query"; |
| |
| @Setter |
| @Getter |
| @Column(name = TRACE_ID) |
| @SQLDatabase.AdditionalEntity(additionalTables = {ADDITIONAL_QUERY_TABLE}, reserveOriginalColumns = true) |
| @BanyanDB.SeriesID(index = 0) |
| @ElasticSearch.Routing |
| private String traceId; |
| @Setter |
| @Getter |
| @Column(name = SPAN_ID) |
| @BanyanDB.SeriesID(index = 1) |
| private String spanId; |
| @Setter |
| @Getter |
| @Column(name = PARENT_ID) |
| private String parentId; |
| @Setter |
| @Getter |
| @Column(name = NAME) |
| private String name; |
| @Setter |
| @Getter |
| @Column(name = DURATION) |
| private long duration; |
| @Setter |
| @Getter |
| @Column(name = KIND) |
| private String kind; |
| @Setter |
| @Getter |
| @Column(name = TIMESTAMP_MILLIS) |
| private long timestampMillis; |
| @Setter |
| @Getter |
| @Column(name = TIMESTAMP) |
| private long timestamp; |
| @Setter |
| @Getter |
| @Column(name = LOCAL_ENDPOINT_SERVICE_NAME) |
| private String localEndpointServiceName; |
| @Setter |
| @Getter |
| @Column(name = LOCAL_ENDPOINT_IPV4, storageOnly = true) |
| private String localEndpointIPV4; |
| @Setter |
| @Getter |
| @Column(name = LOCAL_ENDPOINT_IPV6, storageOnly = true) |
| private String localEndpointIPV6; |
| @Setter |
| @Getter |
| @Column(name = LOCAL_ENDPOINT_PORT, storageOnly = true) |
| private int localEndpointPort; |
| @Setter |
| @Getter |
| @Column(name = REMOTE_ENDPOINT_SERVICE_NAME) |
| private String remoteEndpointServiceName; |
| @Setter |
| @Getter |
| @Column(name = REMOTE_ENDPOINT_IPV4, storageOnly = true) |
| private String remoteEndpointIPV4; |
| @Setter |
| @Getter |
| @Column(name = REMOTE_ENDPOINT_IPV6, storageOnly = true) |
| private String remoteEndpointIPV6; |
| @Setter |
| @Getter |
| @Column(name = REMOTE_ENDPOINT_PORT, storageOnly = true) |
| private int remoteEndpointPort; |
| @Setter |
| @Getter |
| @Column(name = ANNOTATIONS, storageOnly = true, length = 50000) |
| private JsonObject annotations; |
| @Setter |
| @Getter |
| @Column(name = TAGS, storageOnly = true, length = 50000) |
| private JsonObject tags; |
| @Setter |
| @Getter |
| @Column(name = DEBUG) |
| private int debug; |
| @Setter |
| @Getter |
| @Column(name = SHARED) |
| private int shared; |
| @Setter |
| @Getter |
| @Column(name = QUERY, indexOnly = true, length = QUERY_LENGTH) |
| @SQLDatabase.AdditionalEntity(additionalTables = {ADDITIONAL_QUERY_TABLE}) |
| private List<String> query; |
| |
| @Override |
| public StorageID id() { |
| return new StorageID().append(TRACE_ID, traceId).append(SPAN_ID, spanId); |
| } |
| |
| public static class Builder implements StorageBuilder<ZipkinSpanRecord> { |
| @Override |
| public ZipkinSpanRecord storage2Entity(final Convert2Entity converter) { |
| ZipkinSpanRecord record = new ZipkinSpanRecord(); |
| record.setTraceId((String) converter.get(TRACE_ID)); |
| record.setSpanId((String) converter.get(SPAN_ID)); |
| record.setParentId((String) converter.get(PARENT_ID)); |
| record.setName((String) converter.get(NAME)); |
| record.setKind((String) converter.get(KIND)); |
| record.setTimestamp(((Number) converter.get(TIMESTAMP)).longValue()); |
| record.setTimestampMillis(((Number) converter.get(TIMESTAMP_MILLIS)).longValue()); |
| record.setDuration(((Number) converter.get(DURATION)).longValue()); |
| record.setLocalEndpointServiceName((String) converter.get(LOCAL_ENDPOINT_SERVICE_NAME)); |
| record.setLocalEndpointIPV4((String) converter.get(LOCAL_ENDPOINT_IPV4)); |
| record.setLocalEndpointIPV6((String) converter.get(LOCAL_ENDPOINT_IPV6)); |
| if (converter.get(LOCAL_ENDPOINT_PORT) != null) { |
| record.setLocalEndpointPort(((Number) converter.get(LOCAL_ENDPOINT_PORT)).intValue()); |
| } |
| record.setRemoteEndpointServiceName((String) converter.get(REMOTE_ENDPOINT_SERVICE_NAME)); |
| record.setRemoteEndpointIPV4((String) converter.get(REMOTE_ENDPOINT_IPV4)); |
| record.setRemoteEndpointIPV6((String) converter.get(REMOTE_ENDPOINT_IPV6)); |
| if (converter.get(REMOTE_ENDPOINT_PORT) != null) { |
| record.setRemoteEndpointPort(((Number) converter.get(REMOTE_ENDPOINT_PORT)).intValue()); |
| } |
| final String annotationsString = (String) converter.get(ANNOTATIONS); |
| if (StringUtil.isNotEmpty(annotationsString)) { |
| record.setAnnotations(GSON.fromJson(annotationsString, JsonObject.class)); |
| } |
| final String tagsString = (String) converter.get(TAGS); |
| if (StringUtil.isNotEmpty(tagsString)) { |
| record.setTags(GSON.fromJson(tagsString, JsonObject.class)); |
| } |
| if (converter.get(DEBUG) != null) { |
| record.setDebug(((Number) converter.get(DEBUG)).intValue()); |
| } |
| if (converter.get(SHARED) != null) { |
| record.setShared(((Number) converter.get(SHARED)).intValue()); |
| } |
| return record; |
| } |
| |
| @Override |
| public void entity2Storage(final ZipkinSpanRecord storageData, final Convert2Storage converter) { |
| converter.accept(TRACE_ID, storageData.getTraceId()); |
| converter.accept(SPAN_ID, storageData.getSpanId()); |
| converter.accept(PARENT_ID, storageData.getParentId()); |
| converter.accept(NAME, storageData.getName()); |
| converter.accept(KIND, storageData.getKind()); |
| converter.accept(TIMESTAMP, storageData.getTimestamp()); |
| converter.accept(TIMESTAMP_MILLIS, storageData.getTimestampMillis()); |
| converter.accept(TIME_BUCKET, storageData.getTimeBucket()); |
| converter.accept(DURATION, storageData.getDuration()); |
| converter.accept(LOCAL_ENDPOINT_SERVICE_NAME, storageData.getLocalEndpointServiceName()); |
| converter.accept(LOCAL_ENDPOINT_IPV4, storageData.getLocalEndpointIPV4()); |
| converter.accept(LOCAL_ENDPOINT_IPV6, storageData.getLocalEndpointIPV6()); |
| if (storageData.getLocalEndpointPort() != 0) { |
| converter.accept(LOCAL_ENDPOINT_PORT, storageData.getLocalEndpointPort()); |
| } |
| converter.accept(REMOTE_ENDPOINT_SERVICE_NAME, storageData.getRemoteEndpointServiceName()); |
| converter.accept(REMOTE_ENDPOINT_IPV4, storageData.getRemoteEndpointIPV4()); |
| converter.accept(REMOTE_ENDPOINT_IPV6, storageData.getRemoteEndpointIPV6()); |
| if (storageData.getRemoteEndpointPort() != 0) { |
| converter.accept(REMOTE_ENDPOINT_PORT, storageData.getRemoteEndpointPort()); |
| } |
| if (storageData.getAnnotations() != null) { |
| converter.accept(ANNOTATIONS, GSON.toJson(storageData.getAnnotations())); |
| } else { |
| converter.accept(ANNOTATIONS, Const.EMPTY_STRING); |
| } |
| if (storageData.getTags() != null) { |
| converter.accept(TAGS, GSON.toJson(storageData.getTags())); |
| } else { |
| converter.accept(TAGS, Const.EMPTY_STRING); |
| } |
| converter.accept(QUERY, storageData.getQuery()); |
| if (storageData.getDebug() == BooleanUtils.booleanToValue(true)) { |
| converter.accept(DEBUG, storageData.getDebug()); |
| } |
| if (storageData.getShared() == BooleanUtils.booleanToValue(true)) { |
| converter.accept(SHARED, storageData.getShared()); |
| } |
| } |
| } |
| |
| public static Span buildSpanFromRecord(ZipkinSpanRecord record) { |
| Span.Builder span = Span.newBuilder(); |
| span.traceId(record.getTraceId()); |
| span.id(record.getSpanId()); |
| span.parentId(record.getParentId()); |
| if (!StringUtil.isEmpty(record.getKind())) { |
| span.kind(Span.Kind.valueOf(record.getKind())); |
| } |
| span.timestamp(record.getTimestamp()); |
| span.duration(record.getDuration()); |
| span.name(record.getName()); |
| //Build localEndpoint |
| Endpoint.Builder localEndpoint = Endpoint.newBuilder(); |
| localEndpoint.serviceName(record.getLocalEndpointServiceName()); |
| if (!StringUtil.isEmpty(record.getLocalEndpointIPV4())) { |
| localEndpoint.parseIp(record.getLocalEndpointIPV4()); |
| } else { |
| localEndpoint.parseIp(record.getLocalEndpointIPV6()); |
| } |
| localEndpoint.port(record.getLocalEndpointPort()); |
| span.localEndpoint(localEndpoint.build()); |
| //Build remoteEndpoint |
| Endpoint.Builder remoteEndpoint = Endpoint.newBuilder(); |
| remoteEndpoint.serviceName(record.getRemoteEndpointServiceName()); |
| if (!StringUtil.isEmpty(record.getLocalEndpointIPV4())) { |
| remoteEndpoint.parseIp(record.getRemoteEndpointIPV4()); |
| } else { |
| remoteEndpoint.parseIp(record.getRemoteEndpointIPV6()); |
| } |
| remoteEndpoint.port(record.getRemoteEndpointPort()); |
| span.remoteEndpoint(remoteEndpoint.build()); |
| |
| //Build tags |
| JsonObject tagsJson = record.getTags(); |
| if (tagsJson != null) { |
| for (Map.Entry<String, JsonElement> tag : tagsJson.entrySet()) { |
| span.putTag(tag.getKey(), tag.getValue().getAsString()); |
| } |
| } |
| //Build annotation |
| JsonObject annotationJson = record.getAnnotations(); |
| if (annotationJson != null) { |
| for (Map.Entry<String, JsonElement> annotation : annotationJson.entrySet()) { |
| span.addAnnotation(Long.parseLong(annotation.getKey()), annotation.getValue().getAsString()); |
| } |
| } |
| return span.build(); |
| } |
| } |