Add aws-firehose-receiver to support collecting AWS CloudWatch metric(OpenTelemetry format) (#10300)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 61bd10c..23d10c7 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -83,6 +83,7 @@
* [Optional] Optimize single trace query performance by customizing routing in ElasticSearch. SkyWalking trace segments and Zipkin spans are using trace ID for routing. This is OFF by default, controlled by `storage/elasticsearch/enableCustomRouting`.
* Enhance OAP HTTP server to support HTTPS
* Remove handler scan in otel receiver, manual initialization instead
+* Add aws-firehose-receiver to support collecting AWS CloudWatch metric(OpenTelemetry format)
#### UI
diff --git a/docs/en/setup/backend/aws-firehose-receiver.md b/docs/en/setup/backend/aws-firehose-receiver.md
new file mode 100644
index 0000000..43c27df
--- /dev/null
+++ b/docs/en/setup/backend/aws-firehose-receiver.md
@@ -0,0 +1,20 @@
+# AWS Firehose receiver
+
+AWS Firehose receiver listens on `0.0.0.0:12801` by default, and provides an HTTP Endpoint `/aws/firehose/metrics` that follows [Amazon Kinesis Data Firehose Delivery Stream HTTP Endpoint Delivery Specifications](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html)
+You could leverage the receiver to collect [AWS CloudWatch metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html), and analysis it through [MAL](../../concepts-and-designs/mal.md) as the receiver bases on [OpenTelemetry receiver](./opentelemetry-receiver.md)
+
+## Setup(S3 example)
+
+1. Create CloudWatch metrics configuration for S3 (refer to [S3 CloudWatch metrics](https://docs.aws.amazon.com/AmazonS3/latest/userguide/configure-request-metrics-bucket.html))
+2. Stream CloudWatch metrics to AWS Kinesis Data Firehose delivery stream by [CloudWatch metrics stream](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-setup-datalake.html)
+3. Specify AWS Kinesis Data Firehose delivery stream HTTP Endpoint (refer to [Choose HTTP Endpoint for Your Destination](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http))
+
+Usually, the [AWS CloudWatch metrics](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) process flow with OAP is as follows:
+```
+CloudWatch metrics with S3 --> CloudWatch Metric Stream (OpenTelemetry formart) --> Kinesis Data Firehose Delivery Stream --> AWS Firehose receiver(OAP) --> OpenTelemetry receiver(OAP)
+```
+
+## Notice
+
+1. Only OpenTelemetry format is supported (refer to [Metric streams output formats](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats.html))
+2. Only HTTPS could be accepted, you could directly enable TLS and set the receiver to listen 443, or put the receiver behind a gateway with HTTPS (refer to [Amazon Kinesis Data Firehose Delivery Stream HTTP Endpoint Delivery Specifications](https://docs.aws.amazon.com/firehose/latest/dev/httpdeliveryrequestresponse.html))
diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md
index efef47e..554117d 100644
--- a/docs/en/setup/backend/configuration-vocabulary.md
+++ b/docs/en/setup/backend/configuration-vocabulary.md
@@ -304,6 +304,16 @@
| health-checker | default | checkIntervalSeconds | The period of checking OAP internal health status (in seconds). | SW_HEALTH_CHECKER_INTERVAL_SECONDS | 5 |
| configuration-discovery | default | disableMessageDigest | If true, agent receives the latest configuration every time, even without making any changes. By default, OAP uses the SHA512 message digest mechanism to detect changes in configuration. | SW_DISABLE_MESSAGE_DIGEST | false |
| receiver-event | default | gRPC services that handle events data. | - | - | |
+| aws-firehose-receiver | default | host | Binding IP of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_HOST | 0.0.0.0 |
+| - | - | port | Binding port of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_PORT | 12801 |
+| - | - | contextPath | Context path of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_CONTEXT_PATH | / |
+| - | - | maxThreads | Max Thtread number of HTTP server | SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_THREADS | 200 |
+| - | - | idleTimeOut | Idle timeout of a connection for keep-alive. | SW_RECEIVER_AWS_FIREHOSE_HTTP_IDLE_TIME_OUT | 30000 |
+| - | - | acceptQueueSize | Maximum allowed number of open connections | SW_RECEIVER_AWS_FIREHOSE_HTTP_ACCEPT_QUEUE_SIZE | 0 |
+| - | - | maxRequestHeaderSize | Maximum length of all headers in an HTTP/1 response | SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_REQUEST_HEADER_SIZE | 8192 |
+| - | - | enableTLS | Indicate if enable HTTPS for the server | SW_RECEIVER_AWS_FIREHOSE_HTTP_ENABLE_TLS | false |
+| - | - | tlsKeyPath | TLS key path | SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_KEY_PATH | |
+| - | - | tlsCertChainPath | TLS certificate chain path | SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_CERT_CHAIN_PATH | |
## Note
diff --git a/docs/menu.yml b/docs/menu.yml
index a3fd6bc..87b79e3 100644
--- a/docs/menu.yml
+++ b/docs/menu.yml
@@ -117,6 +117,8 @@
path: "/en/guides/backend-oal-scripts"
- name: "OpenTelemetry Metrics"
path: "/en/setup/backend/opentelemetry-receiver"
+ - name: "AWS CloudWatch Metrics"
+ path: "/en/setup/backend/aws-firehose-receiver"
- name: "Zabbix Metrics"
path: "/en/setup/backend/backend-zabbix"
- name: "Meter Analysis"
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java
index 7e50df1..67de3c2 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServer.java
@@ -25,6 +25,7 @@
import com.linecorp.armeria.server.Route;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.docs.DocService;
+import com.linecorp.armeria.server.encoding.DecodingService;
import com.linecorp.armeria.server.healthcheck.HealthCheckService;
import com.linecorp.armeria.server.logging.LoggingService;
@@ -72,8 +73,8 @@
}
return delegate.serve(ctx, req);
})
+ .decorator(DecodingService.newDecorator())
.decorator(LoggingService.newDecorator());
-
if (config.isEnableTLS()) {
sb.https(new InetSocketAddress(
config.getHost(),
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServerConfig.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServerConfig.java
index f7b2eb2..9d9daa1 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServerConfig.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/http/HTTPServerConfig.java
@@ -45,5 +45,4 @@
private String tlsKeyPath;
private String tlsCertChainPath;
-
}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
new file mode 100644
index 0000000..1b86b69
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>server-receiver-plugin</artifactId>
+ <version>9.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>aws-firehose-receiver</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>otel-receiver-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModule.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModule.java
new file mode 100644
index 0000000..12e70ca
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModule.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+
+public class AWSFirehoseReceiverModule extends ModuleDefine {
+
+ public static final String NAME = "aws-firehose";
+
+ public AWSFirehoseReceiverModule() {
+ super(NAME);
+ }
+
+ @Override
+ public Class[] services() {
+ return new Class[0];
+ }
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleConfig.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleConfig.java
new file mode 100644
index 0000000..0d4689d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleConfig.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+
+@Getter
+public class AWSFirehoseReceiverModuleConfig extends ModuleConfig {
+ private String host;
+ private int port;
+ private String contextPath;
+ private int maxThreads = 200;
+ private long idleTimeOut = 30000;
+ private int acceptQueueSize = 0;
+ private int maxRequestHeaderSize = 8192;
+ private boolean enableTLS = false;
+ private String tlsKeyPath;
+ private String tlsCertChainPath;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java
new file mode 100644
index 0000000..22d69f6
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/AWSFirehoseReceiverModuleProvider.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import com.linecorp.armeria.common.HttpMethod;
+import java.util.Collections;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
+import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
+import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
+
+public class AWSFirehoseReceiverModuleProvider extends ModuleProvider {
+ public static final String NAME = "default";
+
+ private AWSFirehoseReceiverModuleConfig moduleConfig;
+ private HTTPServer httpServer;
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Class<? extends ModuleDefine> module() {
+ return AWSFirehoseReceiverModule.class;
+ }
+
+ @Override
+ public ConfigCreator<AWSFirehoseReceiverModuleConfig> newConfigCreator() {
+ return new ConfigCreator<AWSFirehoseReceiverModuleConfig>() {
+ @Override
+ public Class<AWSFirehoseReceiverModuleConfig> type() {
+ return AWSFirehoseReceiverModuleConfig.class;
+ }
+
+ @Override
+ public void onInitialized(final AWSFirehoseReceiverModuleConfig initialized) {
+ moduleConfig = initialized;
+ }
+ };
+ }
+
+ @Override
+ public void prepare() throws ServiceNotProvidedException {
+ final HTTPServerConfig httpServerConfig = HTTPServerConfig.builder().host(moduleConfig.getHost())
+ .port(moduleConfig.getPort())
+ .contextPath(moduleConfig.getContextPath())
+ .maxThreads(moduleConfig.getMaxThreads())
+ .idleTimeOut(moduleConfig.getIdleTimeOut())
+ .acceptQueueSize(moduleConfig.getAcceptQueueSize())
+ .maxRequestHeaderSize(
+ moduleConfig.getMaxRequestHeaderSize())
+ .enableTLS(moduleConfig.isEnableTLS())
+ .tlsKeyPath(moduleConfig.getTlsKeyPath())
+ .tlsCertChainPath(moduleConfig.getTlsCertChainPath())
+ .build();
+ httpServer = new HTTPServer(httpServerConfig);
+ httpServer.initialize();
+ }
+
+ @Override
+ public void start() throws ServiceNotProvidedException, ModuleStartException {
+ final OpenTelemetryMetricRequestProcessor processor = getManager().find(OtelMetricReceiverModule.NAME)
+ .provider()
+ .getService(
+ OpenTelemetryMetricRequestProcessor.class);
+ httpServer.addHandler(
+ new FirehoseHTTPHandler(processor),
+ Collections.singletonList(HttpMethod.POST)
+ );
+ }
+
+ @Override
+ public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
+ httpServer.start();
+ }
+
+ @Override
+ public String[] requiredModules() {
+ return new String[] {
+ OtelMetricReceiverModule.NAME
+ };
+ }
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
new file mode 100644
index 0000000..760630d
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseHTTPHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.linecorp.armeria.common.HttpResponse;
+import com.linecorp.armeria.common.HttpStatus;
+import com.linecorp.armeria.server.annotation.ConsumesJson;
+import com.linecorp.armeria.server.annotation.Post;
+import com.linecorp.armeria.server.annotation.ProducesJson;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import java.io.ByteArrayInputStream;
+import java.util.Base64;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
+
+@Slf4j
+@AllArgsConstructor
+public class FirehoseHTTPHandler {
+
+ private final OpenTelemetryMetricRequestProcessor openTelemetryMetricRequestProcessor;
+
+ @Post("/aws/firehose/metrics")
+ @ConsumesJson
+ @ProducesJson
+ public HttpResponse collectMetrics(final FirehoseReq firehoseReq) {
+ try {
+ for (RequestData record : firehoseReq.getRecords()) {
+ final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(
+ Base64.getDecoder().decode(record.getData()));
+ ExportMetricsServiceRequest request;
+ while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(byteArrayInputStream)) != null) {
+ openTelemetryMetricRequestProcessor.processMetricsRequest(request);
+ }
+ }
+ } catch (InvalidProtocolBufferException e) {
+ log.warn("Only OpenTelemetry format is accepted", e);
+ return HttpResponse.ofJson(
+ HttpStatus.BAD_REQUEST,
+ new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(),
+ "Only OpenTelemetry format is accepted"
+ )
+ );
+ } catch (Exception e) {
+ log.error("Server error", e);
+ return HttpResponse.ofJson(
+ HttpStatus.INTERNAL_SERVER_ERROR,
+ new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(),
+ "Server error, please check the OAP log"
+ )
+ );
+ }
+ return HttpResponse.ofJson(
+ HttpStatus.OK,
+ new FirehoseRes(firehoseReq.getRequestId(), System.currentTimeMillis(), null)
+ );
+ }
+
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseReq.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseReq.java
new file mode 100644
index 0000000..39c699f
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseReq.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class FirehoseReq {
+ private String requestId;
+ private Long timestamp;
+ private List<RequestData> records;
+
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseRes.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseRes.java
new file mode 100644
index 0000000..d2e8841
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/FirehoseRes.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+@AllArgsConstructor
+public class FirehoseRes {
+ private String requestId;
+ private Long timestamp;
+ private String errorMessage;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/RequestData.java b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/RequestData.java
new file mode 100644
index 0000000..140337f
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/java/org/apache/skywalking/oap/server/receiver/aws/firehose/RequestData.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.aws.firehose;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class RequestData {
+ private String data;
+}
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
new file mode 100644
index 0000000..941d9de
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleDefine
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.aws.firehose.AWSFirehoseReceiverModule
diff --git a/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
new file mode 100644
index 0000000..47ac3ff
--- /dev/null
+++ b/oap-server/server-receiver-plugin/aws-firehose-receiver/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.library.module.ModuleProvider
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+
+org.apache.skywalking.oap.server.receiver.aws.firehose.AWSFirehoseReceiverModuleProvider
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverModule.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverModule.java
index 6b23d1a..782d759 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverModule.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverModule.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.receiver.otel;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
public class OtelMetricReceiverModule extends ModuleDefine {
public static final String NAME = "receiver-otel";
@@ -29,6 +30,6 @@
@Override
public Class[] services() {
- return new Class[0];
+ return new Class[] {OpenTelemetryMetricRequestProcessor.class};
}
}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverProvider.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverProvider.java
index baba160..beca266 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverProvider.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/OtelMetricReceiverProvider.java
@@ -26,6 +26,7 @@
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.otel.oc.OCMetricHandler;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricHandler;
+import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
public class OtelMetricReceiverProvider extends ModuleProvider {
@@ -35,6 +36,8 @@
private OtelMetricReceiverConfig config;
+ private OpenTelemetryMetricRequestProcessor metricRequestProcessor;
+
@Override
public String name() {
return NAME;
@@ -62,10 +65,13 @@
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
+ metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(
+ getManager(), config);
+ registerServiceImplementation(OpenTelemetryMetricRequestProcessor.class, metricRequestProcessor);
final List<String> enabledHandlers = config.getEnabledHandlers();
List<Handler> handlers = new ArrayList<>();
final OpenTelemetryMetricHandler openTelemetryMetricHandler = new OpenTelemetryMetricHandler(
- getManager(), config);
+ getManager(), metricRequestProcessor);
if (enabledHandlers.contains(openTelemetryMetricHandler.type())) {
handlers.add(openTelemetryMetricHandler);
}
@@ -78,6 +84,7 @@
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
+ metricRequestProcessor.start();
for (Handler h : handlers) {
h.active();
}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricHandler.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricHandler.java
index 304bdb0..1edbe79 100644
--- a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricHandler.java
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricHandler.java
@@ -18,65 +18,26 @@
package org.apache.skywalking.oap.server.receiver.otel.otlp;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc;
-import io.opentelemetry.proto.common.v1.KeyValue;
-import io.opentelemetry.proto.metrics.v1.Sum;
-import io.opentelemetry.proto.metrics.v1.SummaryDataPoint.ValueAtQuantile;
-import io.vavr.Function1;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Stream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
-import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
-import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
-import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
-import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
-import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
-import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
-import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
import org.apache.skywalking.oap.server.receiver.otel.Handler;
-import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
-import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toMap;
-
@Slf4j
@RequiredArgsConstructor
public class OpenTelemetryMetricHandler
extends MetricsServiceGrpc.MetricsServiceImplBase
implements Handler {
-
private final ModuleManager manager;
- private final OtelMetricReceiverConfig config;
-
- private static final Map<String, String> LABEL_MAPPINGS =
- ImmutableMap
- .<String, String>builder()
- .put("net.host.name", "node_identifier_host_name")
- .put("host.name", "node_identifier_host_name")
- .put("job", "job_name")
- .put("service.name", "job_name")
- .build();
- private List<PrometheusMetricConverter> converters;
+ private final OpenTelemetryMetricRequestProcessor metricRequestProcessor;
@Override
public String type() {
@@ -85,31 +46,9 @@
@Override
public void active() throws ModuleStartException {
- final List<String> enabledRules =
- Splitter.on(",")
- .omitEmptyStrings()
- .splitToList(config.getEnabledOtelRules());
- final List<Rule> rules;
- try {
- rules = Rules.loadRules("otel-rules", enabledRules);
- } catch (IOException e) {
- throw new ModuleStartException("Failed to load otel rules.", e);
- }
-
- if (rules.isEmpty()) {
- return;
- }
-
GRPCHandlerRegister grpcHandlerRegister = manager.find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
- final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
-
- converters = rules
- .stream()
- .map(r -> new PrometheusMetricConverter(r, meterSystem))
- .collect(toList());
-
grpcHandlerRegister.addHandler(this);
}
@@ -117,136 +56,9 @@
public void export(
final ExportMetricsServiceRequest requests,
final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
-
- requests.getResourceMetricsList().forEach(request -> {
- if (log.isDebugEnabled()) {
- log.debug("Resource attributes: {}", request.getResource().getAttributesList());
- }
-
- final Map<String, String> nodeLabels =
- request
- .getResource()
- .getAttributesList()
- .stream()
- .collect(toMap(
- it -> LABEL_MAPPINGS
- .getOrDefault(it.getKey(), it.getKey())
- .replaceAll("\\.", "_"),
- it -> it.getValue().getStringValue(),
- (v1, v2) -> v1));
-
- converters
- .forEach(convert -> convert.toMeter(
- request
- .getScopeMetricsList().stream()
- .flatMap(scopeMetrics -> scopeMetrics
- .getMetricsList().stream()
- .flatMap(metric -> adaptMetrics(nodeLabels, metric))
- .map(Function1.liftTry(Function.identity()))
- .flatMap(tryIt -> MetricConvert.log(tryIt,
- "Convert OTEL metric to prometheus metric")))));
- });
-
+ metricRequestProcessor.processMetricsRequest(requests);
responseObserver.onNext(ExportMetricsServiceResponse.getDefaultInstance());
responseObserver.onCompleted();
}
- private static Map<String, String> buildLabels(List<KeyValue> kvs) {
- return kvs
- .stream()
- .collect(toMap(
- KeyValue::getKey,
- it -> it.getValue().getStringValue()));
- }
-
- private static Map<String, String> mergeLabels(
- final Map<String, String> nodeLabels,
- final Map<String, String> pointLabels) {
-
- // data point labels should have higher precedence and override the one in node labels
-
- final Map<String, String> result = new HashMap<>(nodeLabels);
- result.putAll(pointLabels);
- return result;
- }
-
- private static Map<Double, Long> buildBuckets(
- final List<Long> bucketCounts,
- final List<Double> explicitBounds) {
-
- final Map<Double, Long> result = new HashMap<>();
- for (int i = 0; i < explicitBounds.size(); i++) {
- result.put(explicitBounds.get(i), bucketCounts.get(i));
- }
- result.put(Double.POSITIVE_INFINITY, bucketCounts.get(explicitBounds.size()));
- return result;
- }
-
- // Adapt the OpenTelemetry metrics to SkyWalking metrics
- private Stream<? extends Metric> adaptMetrics(
- final Map<String, String> nodeLabels,
- final io.opentelemetry.proto.metrics.v1.Metric metric) {
- if (metric.hasGauge()) {
- return metric.getGauge().getDataPointsList().stream()
- .map(point -> new Gauge(
- metric.getName(),
- mergeLabels(nodeLabels,
- buildLabels(point.getAttributesList())),
- point.hasAsDouble() ? point.getAsDouble()
- : point.getAsInt(),
- point.getTimeUnixNano() / 1000000));
- }
- if (metric.hasSum()) {
- final Sum sum = metric.getSum();
- if (sum
- .getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
- return Stream.empty();
- }
- if (sum.getIsMonotonic()) {
- return sum.getDataPointsList().stream()
- .map(point -> new Counter(
- metric.getName(),
- mergeLabels(nodeLabels,
- buildLabels(point.getAttributesList())),
- point.hasAsDouble() ? point.getAsDouble()
- : point.getAsInt(),
- point.getTimeUnixNano() / 1000000));
- } else {
- return sum.getDataPointsList().stream()
- .map(point -> new Gauge(
- metric.getName(),
- mergeLabels(nodeLabels,
- buildLabels(point.getAttributesList())),
- point.hasAsDouble() ? point.getAsDouble()
- : point.getAsInt(),
- point.getTimeUnixNano() / 1000000));
- }
- }
- if (metric.hasHistogram()) {
- return metric.getHistogram().getDataPointsList().stream()
- .map(point -> new Histogram(
- metric.getName(),
- mergeLabels(nodeLabels,
- buildLabels(point.getAttributesList())),
- point.getCount(),
- point.getSum(),
- buildBuckets(point.getBucketCountsList(),
- point.getExplicitBoundsList()),
- point.getTimeUnixNano() / 1000000));
- }
- if (metric.hasSummary()) {
- return metric.getSummary().getDataPointsList().stream()
- .map(point -> new Summary(
- metric.getName(),
- mergeLabels(nodeLabels,
- buildLabels(point.getAttributesList())),
- point.getCount(),
- point.getSum(),
- point.getQuantileValuesList().stream().collect(
- toMap(ValueAtQuantile::getQuantile,
- ValueAtQuantile::getValue)),
- point.getTimeUnixNano() / 1000000));
- }
- throw new UnsupportedOperationException("Unsupported type");
- }
}
diff --git a/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java
new file mode 100644
index 0000000..b1a3c9a
--- /dev/null
+++ b/oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.oap.server.receiver.otel.otlp;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
+import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
+import io.opentelemetry.proto.common.v1.KeyValue;
+import io.opentelemetry.proto.metrics.v1.Sum;
+import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
+import io.vavr.Function1;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule;
+import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.Service;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Counter;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric;
+import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Summary;
+import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
+
+import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+
+@RequiredArgsConstructor
+@Slf4j
+public class OpenTelemetryMetricRequestProcessor implements Service {
+
+ private final ModuleManager manager;
+
+ private final OtelMetricReceiverConfig config;
+
+ private static final Map<String, String> LABEL_MAPPINGS =
+ ImmutableMap
+ .<String, String>builder()
+ .put("net.host.name", "node_identifier_host_name")
+ .put("host.name", "node_identifier_host_name")
+ .put("job", "job_name")
+ .put("service.name", "job_name")
+ .build();
+ private List<PrometheusMetricConverter> converters;
+
+ public void processMetricsRequest(final ExportMetricsServiceRequest requests) {
+ requests.getResourceMetricsList().forEach(request -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Resource attributes: {}", request.getResource().getAttributesList());
+ }
+
+ final Map<String, String> nodeLabels =
+ request
+ .getResource()
+ .getAttributesList()
+ .stream()
+ .collect(toMap(
+ it -> LABEL_MAPPINGS
+ .getOrDefault(it.getKey(), it.getKey())
+ .replaceAll("\\.", "_"),
+ it -> it.getValue().getStringValue(),
+ (v1, v2) -> v1
+ ));
+
+ converters
+ .forEach(convert -> convert.toMeter(
+ request
+ .getScopeMetricsList().stream()
+ .flatMap(scopeMetrics -> scopeMetrics
+ .getMetricsList().stream()
+ .flatMap(metric -> adaptMetrics(nodeLabels, metric))
+ .map(Function1.liftTry(Function.identity()))
+ .flatMap(tryIt -> MetricConvert.log(
+ tryIt,
+ "Convert OTEL metric to prometheus metric"
+ )))));
+ });
+
+ }
+
+ public void start() throws ModuleStartException {
+ final List<String> enabledRules =
+ Splitter.on(",")
+ .omitEmptyStrings()
+ .splitToList(config.getEnabledOtelRules());
+ final List<Rule> rules;
+ try {
+ rules = Rules.loadRules("otel-rules", enabledRules);
+ } catch (IOException e) {
+ throw new ModuleStartException("Failed to load otel rules.", e);
+ }
+
+ if (rules.isEmpty()) {
+ return;
+ }
+ final MeterSystem meterSystem = manager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
+
+ converters = rules
+ .stream()
+ .map(r -> new PrometheusMetricConverter(r, meterSystem))
+ .collect(toList());
+ }
+
+ private static Map<String, String> buildLabels(List<KeyValue> kvs) {
+ return kvs
+ .stream()
+ .collect(toMap(
+ KeyValue::getKey,
+ it -> it.getValue().getStringValue()
+ ));
+ }
+
+ private static Map<String, String> mergeLabels(
+ final Map<String, String> nodeLabels,
+ final Map<String, String> pointLabels) {
+
+ // data point labels should have higher precedence and override the one in node labels
+
+ final Map<String, String> result = new HashMap<>(nodeLabels);
+ result.putAll(pointLabels);
+ return result;
+ }
+
+ private static Map<Double, Long> buildBuckets(
+ final List<Long> bucketCounts,
+ final List<Double> explicitBounds) {
+
+ final Map<Double, Long> result = new HashMap<>();
+ for (int i = 0; i < explicitBounds.size(); i++) {
+ result.put(explicitBounds.get(i), bucketCounts.get(i));
+ }
+ result.put(Double.POSITIVE_INFINITY, bucketCounts.get(explicitBounds.size()));
+ return result;
+ }
+
+ // Adapt the OpenTelemetry metrics to SkyWalking metrics
+ private Stream<? extends Metric> adaptMetrics(
+ final Map<String, String> nodeLabels,
+ final io.opentelemetry.proto.metrics.v1.Metric metric) {
+ if (metric.hasGauge()) {
+ return metric.getGauge().getDataPointsList().stream()
+ .map(point -> new Gauge(
+ metric.getName(),
+ mergeLabels(
+ nodeLabels,
+ buildLabels(point.getAttributesList())
+ ),
+ point.hasAsDouble() ? point.getAsDouble()
+ : point.getAsInt(),
+ point.getTimeUnixNano() / 1000000
+ ));
+ }
+ if (metric.hasSum()) {
+ final Sum sum = metric.getSum();
+ if (sum
+ .getAggregationTemporality() != AGGREGATION_TEMPORALITY_CUMULATIVE) {
+ return Stream.empty();
+ }
+ if (sum.getIsMonotonic()) {
+ return sum.getDataPointsList().stream()
+ .map(point -> new Counter(
+ metric.getName(),
+ mergeLabels(
+ nodeLabels,
+ buildLabels(point.getAttributesList())
+ ),
+ point.hasAsDouble() ? point.getAsDouble()
+ : point.getAsInt(),
+ point.getTimeUnixNano() / 1000000
+ ));
+ } else {
+ return sum.getDataPointsList().stream()
+ .map(point -> new Gauge(
+ metric.getName(),
+ mergeLabels(
+ nodeLabels,
+ buildLabels(point.getAttributesList())
+ ),
+ point.hasAsDouble() ? point.getAsDouble()
+ : point.getAsInt(),
+ point.getTimeUnixNano() / 1000000
+ ));
+ }
+ }
+ if (metric.hasHistogram()) {
+ return metric.getHistogram().getDataPointsList().stream()
+ .map(point -> new Histogram(
+ metric.getName(),
+ mergeLabels(
+ nodeLabels,
+ buildLabels(point.getAttributesList())
+ ),
+ point.getCount(),
+ point.getSum(),
+ buildBuckets(
+ point.getBucketCountsList(),
+ point.getExplicitBoundsList()
+ ),
+ point.getTimeUnixNano() / 1000000
+ ));
+ }
+ if (metric.hasSummary()) {
+ return metric.getSummary().getDataPointsList().stream()
+ .map(point -> new Summary(
+ metric.getName(),
+ mergeLabels(
+ nodeLabels,
+ buildLabels(point.getAttributesList())
+ ),
+ point.getCount(),
+ point.getSum(),
+ point.getQuantileValuesList().stream().collect(
+ toMap(
+ SummaryDataPoint.ValueAtQuantile::getQuantile,
+ SummaryDataPoint.ValueAtQuantile::getValue
+ )),
+ point.getTimeUnixNano() / 1000000
+ ));
+ }
+ throw new UnsupportedOperationException("Unsupported type");
+ }
+}
diff --git a/oap-server/server-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/pom.xml
index d176294..52f66c6 100644
--- a/oap-server/server-receiver-plugin/pom.xml
+++ b/oap-server/server-receiver-plugin/pom.xml
@@ -47,6 +47,7 @@
<module>skywalking-zabbix-receiver-plugin</module>
<module>skywalking-ebpf-receiver-plugin</module>
<module>skywalking-telegraf-receiver-plugin</module>
+ <module>aws-firehose-receiver</module>
</modules>
<dependencies>
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/UnzippingBytesRequestConverter.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/UnzippingBytesRequestConverter.java
deleted file mode 100644
index 412ccac..0000000
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/UnzippingBytesRequestConverter.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.receiver.zipkin.handler;
-
-import com.linecorp.armeria.common.AggregatedHttpRequest;
-import com.linecorp.armeria.common.HttpData;
-import com.linecorp.armeria.common.HttpHeaderNames;
-import com.linecorp.armeria.common.encoding.StreamDecoderFactory;
-import com.linecorp.armeria.server.ServiceRequestContext;
-
-final class UnzippingBytesRequestConverter {
-
- static HttpData convertRequest(ServiceRequestContext ctx, AggregatedHttpRequest request) {
- String encoding = request.headers().get(HttpHeaderNames.CONTENT_ENCODING);
- HttpData content = request.content();
- if (!content.isEmpty() && encoding != null && encoding.contains("gzip")) {
- content = StreamDecoderFactory.gzip().newDecoder(ctx.alloc()).decode(content);
- if (content.isEmpty()) {
- content.close();
- throw new IllegalArgumentException("Cannot unzip request content bytes");
- }
- }
- return content;
- }
-}
diff --git a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java
index 2acaa9d..4befc24 100644
--- a/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java
+++ b/oap-server/server-receiver-plugin/zipkin-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/zipkin/handler/ZipkinSpanHTTPHandler.java
@@ -22,7 +22,6 @@
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
-import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.annotation.ConsumesJson;
import com.linecorp.armeria.server.annotation.ConsumesProtobuf;
import com.linecorp.armeria.server.annotation.Post;
@@ -38,6 +37,7 @@
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
+
import static java.util.Objects.nonNull;
@Slf4j
@@ -62,45 +62,44 @@
}
@Post("/api/v2/spans")
- public HttpResponse collectV2Spans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.JSON_V2, ctx, req);
+ public HttpResponse collectV2Spans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.JSON_V2, req);
}
@Post("/api/v2/spans")
@ConsumesJson
- public HttpResponse collectV2JsonSpans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.JSON_V2, ctx, req);
+ public HttpResponse collectV2JsonSpans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.JSON_V2, req);
}
@Post("/api/v2/spans")
@ConsumesProtobuf
- public HttpResponse collectV2ProtobufSpans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.PROTO3, ctx, req);
+ public HttpResponse collectV2ProtobufSpans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.PROTO3, req);
}
@Post("/api/v1/spans")
- public HttpResponse collectV1Spans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.JSON_V1, ctx, req);
+ public HttpResponse collectV1Spans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.JSON_V1, req);
}
@Post("/api/v1/spans")
@ConsumesJson
- public HttpResponse collectV1JsonSpans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.JSON_V1, ctx, req);
+ public HttpResponse collectV1JsonSpans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.JSON_V1, req);
}
@Post("/api/v1/spans")
@ConsumesThrift
- public HttpResponse collectV1ThriftSpans(ServiceRequestContext ctx, HttpRequest req) {
- return doCollectSpans(SpanBytesDecoder.THRIFT, ctx, req);
+ public HttpResponse collectV1ThriftSpans(HttpRequest req) {
+ return doCollectSpans(SpanBytesDecoder.THRIFT, req);
}
HttpResponse doCollectSpans(final SpanBytesDecoder decoder,
- final ServiceRequestContext ctx,
final HttpRequest req) {
final HistogramMetrics.Timer timer = histogram.createTimer();
final HttpResponse response = HttpResponse.from(req.aggregate().thenApply(request -> {
- try (final HttpData httpData = UnzippingBytesRequestConverter.convertRequest(ctx, request)) {
+ try (final HttpData httpData = request.content()) {
final List<Span> spanList = decoder.decodeList(httpData.byteBuf().nioBuffer());
spanForward.send(spanList);
return HttpResponse.of(HttpStatus.OK);
diff --git a/oap-server/server-starter/pom.xml b/oap-server/server-starter/pom.xml
index 1a102af..322c768 100644
--- a/oap-server/server-starter/pom.xml
+++ b/oap-server/server-starter/pom.xml
@@ -171,6 +171,11 @@
<artifactId>skywalking-telegraf-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.skywalking</groupId>
+ <artifactId>aws-firehose-receiver</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- receiver module -->
<!-- fetcher module -->
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index ed97c2f..27df3c3 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -543,3 +543,17 @@
selector: ${SW_RECEIVER_TELEGRAF:default}
default:
activeFiles: ${SW_RECEIVER_TELEGRAF_ACTIVE_FILES:vm}
+
+aws-firehose:
+ selector: ${SW_RECEIVER_AWS_FIREHOSE:default}
+ default:
+ host: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_HOST:0.0.0.0}
+ port: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_PORT:12801}
+ contextPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_CONTEXT_PATH:/}
+ maxThreads: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_THREADS:200}
+ idleTimeOut: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_IDLE_TIME_OUT:30000}
+ acceptQueueSize: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_ACCEPT_QUEUE_SIZE:0}
+ maxRequestHeaderSize: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_MAX_REQUEST_HEADER_SIZE:8192}
+ enableTLS: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_ENABLE_TLS:false}
+ tlsKeyPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_KEY_PATH:}
+ tlsCertChainPath: ${SW_RECEIVER_AWS_FIREHOSE_HTTP_TLS_CERT_CHAIN_PATH:}