Support transmit the OpenTelemetry Metrics protocol. (#117)
diff --git a/CHANGES.md b/CHANGES.md
index d972e70..6c1d7c4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,13 +2,14 @@
==================
Release Notes.
-1.0.2
+1.1.0
------------------
#### Features
+* Support transmit the OpenTelemetry Metrics protocol.
#### Bug Fixes
* Fix the missing return data when receive metrics in batch mode.
#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/143?closed=1)
-- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.0.2+is%3Aclosed)
+- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Apr+milestone%3A1.1.0+is%3Aclosed)
diff --git a/configs/satellite_config.yaml b/configs/satellite_config.yaml
index d1d6d9c..6952167 100644
--- a/configs/satellite_config.yaml
+++ b/configs/satellite_config.yaml
@@ -507,3 +507,35 @@
client_name: grpc-client
forwarders:
- plugin_name: native-ebpf-profiling-grpc-forwarder
+ - common_config:
+ pipe_name: otlp-metrics-v1-pipe
+ gatherer:
+ server_name: "grpc-server"
+ receiver:
+ plugin_name: "grpc-otlp-metrics-v1-receiver"
+ queue:
+ plugin_name: "memory-queue"
+ # The maximum buffer event size.
+ event_buffer_size: ${SATELLITE_QUEUE_EVENT_BUFFER_SIZE:5000}
+ # The partition count of queue.
+ partition: ${SATELLITE_QUEUE_PARTITION:4}
+ processor:
+ filters:
+ sender:
+ fallbacker:
+ plugin_name: none-fallbacker
+ # The time interval between two flush operations. And the time unit is millisecond.
+ flush_time: ${SATELLITE_METERPIPE_SENDER_FLUSH_TIME:1000}
+ # The maximum buffer elements.
+ max_buffer_size: ${SATELLITE_METERPIPE_SENDER_MAX_BUFFER_SIZE:200}
+ # The minimum flush elements.
+ min_flush_events: ${SATELLITE_METERPIPE_SENDER_MIN_FLUSH_EVENTS:1}
+ client_name: grpc-client
+ forwarders:
+ - plugin_name: otlp-metrics-v1-grpc-forwarder
+ # The LRU policy cache size for hosting routine rules of service instance.
+ routing_rule_lru_cache_size: ${SATELLITE_METERPIPE_FORWARD_ROUTING_RULE_LRU_CACHE_SIZE:5000}
+ # The TTL of the LRU cache size for hosting routine rules of service instance.
+ routing_rule_lru_cache_ttl: ${SATELLITE_METERPIPE_FORWARD_ROUTING_RULE_LRU_CACHE_TTL:180}
+ # The label key of the routing data, multiple keys are split by ","
+ routing_label_keys: net.host.name,host.name,job,service.name
\ No newline at end of file
diff --git a/docs/en/setup/plugins/forwarder_otlp-metrics-v1-grpc-forwarder.md b/docs/en/setup/plugins/forwarder_otlp-metrics-v1-grpc-forwarder.md
new file mode 100755
index 0000000..433f0a6
--- /dev/null
+++ b/docs/en/setup/plugins/forwarder_otlp-metrics-v1-grpc-forwarder.md
@@ -0,0 +1,19 @@
+# Forwarder/otlp-metrics-v1-grpc-forwarder
+## Description
+This is a synchronization grpc forwarder with the OpenTelemetry metrics v1 protocol.
+## DefaultConfig
+```yaml
+# The LRU policy cache size for hosting routine rules of service instance.
+routing_rule_lru_cache_size: 5000
+# The TTL of the LRU cache size for hosting routine rules of service instance.
+routing_rule_lru_cache_ttl: 180
+# The label key of the routing data, multiple keys are split by ","
+routing_label_keys: net.host.name,host.name,job,service.name
+```
+## Configuration
+|Name|Type|Description|
+|----|----|-----------|
+| routing_label_keys | string | The label key of the routing data, multiple keys are split by "," |
+| routing_rule_lru_cache_size | int | The LRU policy cache size for hosting routine rules of service instance. |
+| routing_rule_lru_cache_ttl | int | The TTL of the LRU cache size for hosting routine rules of service instance. |
+
diff --git a/docs/en/setup/plugins/plugin-list.md b/docs/en/setup/plugins/plugin-list.md
index e1c7496..f386c3d 100755
--- a/docs/en/setup/plugins/plugin-list.md
+++ b/docs/en/setup/plugins/plugin-list.md
@@ -24,6 +24,7 @@
- [Native Process GRPC Forwarder](./forwarder_native-process-grpc-forwarder.md)
- [Native Profile GRPC Forwarder](./forwarder_native-profile-grpc-forwarder.md)
- [Native Tracing GRPC Forwarder](./forwarder_native-tracing-grpc-forwarder.md)
+ - [OpenTelemetry Metrics v1 GRPC Forwarder](./forwarder_otlp-metrics-v1-grpc-forwarder.md)
- Parser
- Queue
- [Memory Queue](./queue_memory-queue.md)
@@ -44,6 +45,7 @@
- [GRPC Native Process Receiver](./receiver_grpc-native-process-receiver.md)
- [GRPC Native Profile Receiver](./receiver_grpc-native-profile-receiver.md)
- [GRPC Native Tracing Receiver](./receiver_grpc-native-tracing-receiver.md)
+ - [GRPC OpenTelemetry Metrics v1 Receiver](./receiver_grpc-otlp-metrics-v1-receiver.md)
- [HTTP Native Log Receiver](./receiver_http-native-log-receiver.md)
- Server
- [GRPC Server](./server_grpc-server.md)
diff --git a/docs/en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver.md b/docs/en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver.md
new file mode 100755
index 0000000..cb72aa0
--- /dev/null
+++ b/docs/en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver.md
@@ -0,0 +1,11 @@
+# Receiver/grpc-otlp-metrics-v1-receiver
+## Description
+This is a receiver for OpenTelemetry Metrics v1 format, which is defined at https://github.com/open-telemetry/opentelemetry-proto/blob/724e427879e3d2bae2edc0218fff06e37b9eb46e/opentelemetry/proto/collector/metrics/v1/metrics_service.proto.
+## Support Forwarders
+ - [otlp-metrics-v1-grpc-forwarder](forwarder_otlp-metrics-v1-grpc-forwarder.md)
+## DefaultConfig
+```yaml ```
+## Configuration
+|Name|Type|Description|
+|----|----|-----------|
+
diff --git a/docs/menu.yml b/docs/menu.yml
index fe5ac6a..8afff50 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -117,6 +117,8 @@
path: /en/setup/plugins/forwarder_native-profile-grpc-forwarder
- name: Native Tracing GRPC Forwarder
path: /en/setup/plugins/forwarder_native-tracing-grpc-forwarder
+ - name: OpenTelemetry Metrics v1 GRPC Forwarder
+ path: /en/setup/plugins/forwarder_otlp-metrics-v1-grpc-forwarder
- name: Queue
catalog:
- name: Memory Queue
@@ -155,6 +157,8 @@
path: /en/setup/plugins/receiver_grpc-native-profile-receiver
- name: GRPC Native Tracing Receiver
path: /en/setup/plugins/receiver_grpc-native-tracing-receiver
+ - name: GRPC OpenTelemetry Metrics v1 Receiver
+ path: /en/setup/plugins/receiver_grpc-otlp-metrics-v1-receiver
- name: HTTP Native Log Receiver
path: /en/setup/plugins/receiver_http-native-log-receiver
- name: Server
diff --git a/go.mod b/go.mod
index 6f0147b..c636271 100644
--- a/go.mod
+++ b/go.mod
@@ -18,11 +18,11 @@
go.uber.org/automaxprocs v1.4.0
golang.org/x/mod v0.4.2
google.golang.org/grpc v1.44.0
- google.golang.org/protobuf v1.28.0
+ google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
k8s.io/apimachinery v0.20.6
- skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9
+ skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581
)
require (
diff --git a/go.sum b/go.sum
index 6d0e9b9..d634463 100644
--- a/go.sum
+++ b/go.sum
@@ -1607,8 +1607,8 @@
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -1724,6 +1724,6 @@
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9 h1:aeZds1YLgbt7IJQEub1cBX97jWqFbI8J1Vjscg9obkI=
-skywalking.apache.org/repo/goapi v0.0.0-20220615082501-7d36e7c0c3c9/go.mod h1:uWwwvhcwe2MD/nJCg0c1EE/eL6KzaBosLHDfMFoEJ30=
+skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581 h1:Pst4hR7kYpUXauPPZFCcRtt84E/ccsflDoKLODjA2Uo=
+skywalking.apache.org/repo/goapi v0.0.0-20220824100816-9c0fee7e3581/go.mod h1:ARcOiM3INd/UOU3Naeqkmo8WFS96jiHmSUawLoAXZjU=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/client/grpc/client_config.go b/plugins/client/grpc/client_config.go
index aaeef05..a3531a0 100644
--- a/plugins/client/grpc/client_config.go
+++ b/plugins/client/grpc/client_config.go
@@ -30,6 +30,7 @@
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
+ _ "google.golang.org/grpc/encoding/gzip" // for install the "gzip" decompressor
"google.golang.org/grpc/metadata"
)
diff --git a/plugins/client/grpc/lb/config.go b/plugins/client/grpc/lb/config.go
index 67a656f..dccff63 100644
--- a/plugins/client/grpc/lb/config.go
+++ b/plugins/client/grpc/lb/config.go
@@ -35,6 +35,13 @@
})
}
+func GetAddress(ctx context.Context) string {
+ if config := queryConfig(ctx); config != nil {
+ return config.appointAddr
+ }
+ return ""
+}
+
func queryConfig(ctx context.Context) *LoadBalancerConfig {
value := ctx.Value(ctxKeyInstance)
if value == nil {
diff --git a/plugins/client/grpc/lb/load_balancer.go b/plugins/client/grpc/lb/load_balancer.go
index 8286c2c..b1a62d0 100644
--- a/plugins/client/grpc/lb/load_balancer.go
+++ b/plugins/client/grpc/lb/load_balancer.go
@@ -48,10 +48,13 @@
}
addrToConn := make(map[string]balancer.SubConn)
- cons := make([]balancer.SubConn, 0)
+ cons := make([]*connectionWrap, 0)
for conn, connInfo := range info.ReadySCs {
addrToConn[connInfo.Address.Addr] = conn
- cons = append(cons, conn)
+ cons = append(cons, &connectionWrap{
+ addr: connInfo.Address.Addr,
+ conn: conn,
+ })
}
return &satelliteDynamicPicker{
@@ -62,7 +65,7 @@
}
type satelliteDynamicPicker struct {
- cons []balancer.SubConn
+ cons []*connectionWrap
addrToConn map[string]balancer.SubConn
connCount int
@@ -70,10 +73,15 @@
next int
}
+type connectionWrap struct {
+ addr string
+ conn balancer.SubConn
+}
+
func (s *satelliteDynamicPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// only one connection
if s.connCount == 1 {
- return balancer.PickResult{SubConn: s.cons[0]}, nil
+ return balancer.PickResult{SubConn: s.cons[0].conn}, nil
}
config := queryConfig(info.Ctx)
@@ -91,7 +99,10 @@
// hash the route key
routeIndex := hashCode(config.routeKey) % s.connCount
- return balancer.PickResult{SubConn: s.cons[routeIndex]}, nil
+ connWrap := s.cons[routeIndex]
+ // update the address to the config
+ config.appointAddr = connWrap.addr
+ return balancer.PickResult{SubConn: connWrap.conn}, nil
}
func (s *satelliteDynamicPicker) roundRobinConnection() balancer.PickResult {
@@ -99,7 +110,7 @@
sc := s.cons[s.next]
s.next = (s.next + 1) % s.connCount
s.mu.Unlock()
- return balancer.PickResult{SubConn: sc}
+ return balancer.PickResult{SubConn: sc.conn}
}
func hashCode(s string) int {
diff --git a/plugins/forwarder/forwarder_repository.go b/plugins/forwarder/forwarder_repository.go
index 6429454..51a47bf 100644
--- a/plugins/forwarder/forwarder_repository.go
+++ b/plugins/forwarder/forwarder_repository.go
@@ -34,6 +34,7 @@
grpc_nativeprocess "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprocess"
grpc_nativeprofile "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativeprofile"
grpc_nativetracing "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/nativetracing"
+ "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/otlpmetricsv1"
kafka_nativelog "github.com/apache/skywalking-satellite/plugins/forwarder/kafka/nativelog"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
@@ -60,6 +61,7 @@
new(envoyalsv3.Forwarder),
new(envoymetricsv2.Forwarder),
new(envoymetricsv3.Forwarder),
+ new(otlpmetricsv1.Forwarder),
}
for _, forwarder := range forwarders {
plugin.RegisterPlugin(forwarder)
diff --git a/plugins/forwarder/grpc/otlpmetricsv1/forwarder.go b/plugins/forwarder/grpc/otlpmetricsv1/forwarder.go
new file mode 100644
index 0000000..60ab9d6
--- /dev/null
+++ b/plugins/forwarder/grpc/otlpmetricsv1/forwarder.go
@@ -0,0 +1,186 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package otlpmetricsv1
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/internal/satellite/event"
+ "github.com/apache/skywalking-satellite/plugins/client/grpc/lb"
+
+ "k8s.io/apimachinery/pkg/util/cache"
+ metrics "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/collector/metrics/v1"
+ common "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/common/v1"
+ v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+ "google.golang.org/grpc"
+)
+
+const (
+ Name = "otlp-metrics-v1-grpc-forwarder"
+ ShowName = "OpenTelemetry Metrics v1 GRPC Forwarder"
+)
+
+type Forwarder struct {
+ config.CommonFields
+ // The label key of the routing data, multiple keys are split by ","
+ RoutingLabelKeys string `mapstructure:"routing_label_keys"`
+ // The LRU policy cache size for hosting routine rules of service instance.
+ RoutingRuleLRUCacheSize int `mapstructure:"routing_rule_lru_cache_size"`
+ // The TTL of the LRU cache size for hosting routine rules of service instance.
+ RoutingRuleLRUCacheTTL int `mapstructure:"routing_rule_lru_cache_ttl"`
+
+ metricsClient metrics.MetricsServiceClient
+ metadataKeys []string
+ upstreamCache *cache.LRUExpireCache
+ upstreamCacheExpire time.Duration
+}
+
+func (f *Forwarder) Name() string {
+ return Name
+}
+
+func (f *Forwarder) ShowName() string {
+ return ShowName
+}
+
+func (f *Forwarder) Description() string {
+ return "This is a synchronization grpc forwarder with the OpenTelemetry metrics v1 protocol."
+}
+
+func (f *Forwarder) DefaultConfig() string {
+ return `
+# The LRU policy cache size for hosting routine rules of service instance.
+routing_rule_lru_cache_size: 5000
+# The TTL of the LRU cache size for hosting routine rules of service instance.
+routing_rule_lru_cache_ttl: 180
+# The label key of the routing data, multiple keys are split by ","
+routing_label_keys: net.host.name,host.name,job,service.name
+`
+}
+
+func (f *Forwarder) Prepare(connection interface{}) error {
+ client, ok := connection.(*grpc.ClientConn)
+ if !ok {
+ return fmt.Errorf("the %s only accepts a grpc client, but received a %s",
+ f.Name(), reflect.TypeOf(connection).String())
+ }
+ f.metricsClient = metrics.NewMetricsServiceClient(client)
+ if f.RoutingLabelKeys == "" {
+ return fmt.Errorf("please provide metadata keys")
+ }
+ f.metadataKeys = strings.Split(f.RoutingLabelKeys, ",")
+ f.upstreamCache = cache.NewLRUExpireCache(f.RoutingRuleLRUCacheSize)
+ f.upstreamCacheExpire = time.Second * time.Duration(f.RoutingRuleLRUCacheTTL)
+ return nil
+}
+
+func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ for _, d := range batch {
+ key, err := f.generateRoutingKey(d.GetOpenTelementryMetricsV1Request())
+ if err != nil {
+ log.Logger.Errorf("generate the routing key failure: %v", err)
+ continue
+ }
+ ctx := lb.WithLoadBalanceConfig(
+ context.Background(),
+ key,
+ f.loadCachedPeer(key))
+
+ _, err = f.metricsClient.Export(ctx, d.GetOpenTelementryMetricsV1Request())
+ if err != nil {
+ log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+ return err
+ }
+ f.savePeerInstanceFromStream(ctx, key)
+ }
+ return nil
+}
+
+func (f *Forwarder) savePeerInstanceFromStream(ctx context.Context, instance string) {
+ upstream := lb.GetAddress(ctx)
+ if upstream == "" {
+ return
+ }
+
+ f.upstreamCache.Add(instance, upstream, f.upstreamCacheExpire)
+}
+
+func (f *Forwarder) loadCachedPeer(instance string) string {
+ if get, exists := f.upstreamCache.Get(instance); exists {
+ return get.(string)
+ }
+ return ""
+}
+
+func (f *Forwarder) generateRoutingKey(data *metrics.ExportMetricsServiceRequest) (string, error) {
+ if len(data.GetResourceMetrics()) == 0 {
+ return "", fmt.Errorf("no resources")
+ }
+ var lastKVs []*common.KeyValue
+ for _, m := range data.GetResourceMetrics() {
+ if m.Resource == nil {
+ continue
+ }
+ if len(m.Resource.Attributes) == 0 {
+ continue
+ }
+ lastKVs = m.Resource.Attributes
+ result := ""
+ for _, kv := range m.Resource.Attributes {
+ for _, key := range f.metadataKeys {
+ if kv.GetKey() == key {
+ result += fmt.Sprintf(",%s", kv.GetValue().GetStringValue())
+ }
+ }
+ }
+ if result != "" {
+ return result, nil
+ }
+ }
+ if lastKVs == nil {
+ return "", fmt.Errorf("could not found any attributes")
+ }
+
+ var keys string
+ for i, k := range lastKVs {
+ if i > 0 {
+ keys += ","
+ }
+ keys += k.GetKey()
+ }
+ return "", fmt.Errorf("could not found anly routing key, existing keys sample: %s", keys)
+}
+
+func (f *Forwarder) ForwardType() v1.SniffType {
+ return v1.SniffType_OpenTelementryMetricsV1Type
+}
+
+func (f *Forwarder) SyncForward(*v1.SniffData) (*v1.SniffData, error) {
+ return nil, fmt.Errorf("unsupport sync forward")
+}
+
+func (f *Forwarder) SupportedSyncInvoke() bool {
+ return false
+}
diff --git a/plugins/receiver/grpc/otlpmetricsv1/metrics_service.go b/plugins/receiver/grpc/otlpmetricsv1/metrics_service.go
new file mode 100644
index 0000000..08ba5f2
--- /dev/null
+++ b/plugins/receiver/grpc/otlpmetricsv1/metrics_service.go
@@ -0,0 +1,49 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package otlpmetricsv1
+
+import (
+ "context"
+ "time"
+
+ metrics "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/collector/metrics/v1"
+ sniffer "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const eventName = "grpc-envoy-metrics-v3-event"
+
+type MetricsService struct {
+ receiveChannel chan *sniffer.SniffData
+ metrics.MetricsServiceServer
+}
+
+func (m *MetricsService) Export(ctx context.Context, req *metrics.ExportMetricsServiceRequest) (*metrics.ExportMetricsServiceResponse, error) {
+ e := &sniffer.SniffData{
+ Name: eventName,
+ Timestamp: time.Now().UnixNano() / 1e6,
+ Meta: nil,
+ Type: sniffer.SniffType_OpenTelementryMetricsV1Type,
+ Remote: true,
+ Data: &sniffer.SniffData_OpenTelementryMetricsV1Request{
+ OpenTelementryMetricsV1Request: req,
+ },
+ }
+
+ m.receiveChannel <- e
+ return &metrics.ExportMetricsServiceResponse{}, nil
+}
diff --git a/plugins/receiver/grpc/otlpmetricsv1/receiver.go b/plugins/receiver/grpc/otlpmetricsv1/receiver.go
new file mode 100644
index 0000000..d075e6c
--- /dev/null
+++ b/plugins/receiver/grpc/otlpmetricsv1/receiver.go
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package otlpmetricsv1
+
+import (
+ "github.com/apache/skywalking-satellite/internal/pkg/config"
+ module "github.com/apache/skywalking-satellite/internal/satellite/module/api"
+ forwarder "github.com/apache/skywalking-satellite/plugins/forwarder/api"
+ "github.com/apache/skywalking-satellite/plugins/forwarder/grpc/otlpmetricsv1"
+ "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+
+ metrics "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/collector/metrics/v1"
+ sniffer "skywalking.apache.org/repo/goapi/satellite/data/v1"
+)
+
+const (
+ Name = "grpc-otlp-metrics-v1-receiver"
+ ShowName = "GRPC OpenTelemetry Metrics v1 Receiver"
+)
+
+type Receiver struct {
+ config.CommonFields
+ grpc.CommonGRPCReceiverFields
+ service *MetricsService
+}
+
+func (r *Receiver) Name() string {
+ return Name
+}
+
+func (r *Receiver) ShowName() string {
+ return ShowName
+}
+
+func (r *Receiver) Description() string {
+ return "This is a receiver for OpenTelemetry Metrics v1 format, " +
+ "which is defined at https://github.com/open-telemetry/opentelemetry-proto/blob/" +
+ "724e427879e3d2bae2edc0218fff06e37b9eb46e/opentelemetry/proto/collector/metrics/v1/metrics_service.proto."
+}
+
+func (r *Receiver) DefaultConfig() string {
+ return ` `
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+ r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
+ r.service = &MetricsService{receiveChannel: r.OutputChannel}
+ metrics.RegisterMetricsServiceServer(r.Server, r.service)
+}
+
+func (r *Receiver) RegisterSyncInvoker(_ module.SyncInvoker) {
+}
+
+func (r *Receiver) Channel() <-chan *sniffer.SniffData {
+ return r.OutputChannel
+}
+
+func (r *Receiver) SupportForwarders() []forwarder.Forwarder {
+ return []forwarder.Forwarder{
+ new(otlpmetricsv1.Forwarder),
+ }
+}
diff --git a/plugins/receiver/grpc/otlpmetricsv1/receiver_test.go b/plugins/receiver/grpc/otlpmetricsv1/receiver_test.go
new file mode 100644
index 0000000..e0f917d
--- /dev/null
+++ b/plugins/receiver/grpc/otlpmetricsv1/receiver_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package otlpmetricsv1
+
+import (
+ "context"
+ "testing"
+
+ metrics "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/collector/metrics/v1"
+ common "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/common/v1"
+ v1 "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/metrics/v1"
+ resource "skywalking.apache.org/repo/goapi/proto/opentelemetry/proto/resource/v1"
+ sniffer "skywalking.apache.org/repo/goapi/satellite/data/v1"
+
+ "google.golang.org/grpc"
+
+ _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+ receiver_grpc "github.com/apache/skywalking-satellite/plugins/receiver/grpc"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+ recConf := make(map[string]string, 2)
+ receiver_grpc.TestReceiverWithConfig(new(Receiver), recConf, func(t *testing.T, sequence int, conn *grpc.ClientConn, ctx context.Context) string {
+ client := metrics.NewMetricsServiceClient(conn)
+ data := initData()
+ _, err := client.Export(ctx, data)
+ if err != nil {
+ t.Fatalf("cannot open the stream send mode: %v", err)
+ }
+ return data.String()
+ }, func(data *sniffer.SniffData) string {
+ return data.GetOpenTelementryMetricsV1Request().String()
+ }, t)
+}
+
+func initData() *metrics.ExportMetricsServiceRequest {
+ return &metrics.ExportMetricsServiceRequest{
+ ResourceMetrics: []*v1.ResourceMetrics{
+ {
+ Resource: &resource.Resource{
+ Attributes: []*common.KeyValue{
+ {
+ Key: "test",
+ Value: &common.AnyValue{
+ Value: &common.AnyValue_StringValue{
+ StringValue: "1",
+ },
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+}
diff --git a/plugins/receiver/receiver_repository.go b/plugins/receiver/receiver_repository.go
index 25eaf42..4335cff 100644
--- a/plugins/receiver/receiver_repository.go
+++ b/plugins/receiver/receiver_repository.go
@@ -36,6 +36,7 @@
grpcnativeprocess "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprocess"
grpcnativeprofile "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativeprofile"
grpcnativetracing "github.com/apache/skywalking-satellite/plugins/receiver/grpc/nativetracing"
+ "github.com/apache/skywalking-satellite/plugins/receiver/grpc/otlpmetricsv1"
httpnavtivelog "github.com/apache/skywalking-satellite/plugins/receiver/http/nativcelog"
)
@@ -59,6 +60,7 @@
new(envoyalsv3.Receiver),
new(envoymetricsv2.Receiver),
new(envoymetricsv3.Receiver),
+ new(otlpmetricsv1.Receiver),
}
for _, receiver := range receivers {
plugin.RegisterPlugin(receiver)