blob: d2b6bda6fd2485d2ceb15b25f8b09d7659494132 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.exporter.provider.grpc;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataLabel;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.IntValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.LabeledValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.LongValueHolder;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.MetricsMetaInfo;
import org.apache.skywalking.oap.server.core.analysis.metrics.WithMetadata;
import org.apache.skywalking.oap.server.core.exporter.ExportData;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.exporter.MetricValuesExportService;
import org.apache.skywalking.oap.server.exporter.grpc.EventType;
import org.apache.skywalking.oap.server.exporter.grpc.ExportMetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.ExportResponse;
import org.apache.skywalking.oap.server.exporter.grpc.KeyValue;
import org.apache.skywalking.oap.server.exporter.grpc.MetricExportServiceGrpc;
import org.apache.skywalking.oap.server.exporter.grpc.MetricValue;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionMetric;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionReq;
import org.apache.skywalking.oap.server.exporter.grpc.SubscriptionsResp;
import org.apache.skywalking.oap.server.exporter.provider.ExporterSetting;
import org.apache.skywalking.oap.server.exporter.provider.MetricFormatter;
import org.apache.skywalking.oap.server.library.client.grpc.GRPCClient;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.GRPCStreamStatus;
@Slf4j
public class GRPCMetricsExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<ExportData> {
/**
* The period of subscription list fetching is hardcoded as 30s.
*/
private static final long FETCH_SUBSCRIPTION_PERIOD = 30_000;
private final ExporterSetting setting;
private MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
private MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
private DataCarrier exportBuffer;
private ReentrantLock fetchListLock;
private volatile List<SubscriptionMetric> subscriptionList;
private volatile long lastFetchTimestamp = 0;
public GRPCMetricsExporter(ExporterSetting setting) {
this.setting = setting;
}
@Override
public void start() {
GRPCClient client = new GRPCClient(setting.getGRPCTargetHost(), setting.getGRPCTargetPort());
client.connect();
ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<ExportData>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
subscriptionList = new ArrayList<>();
fetchListLock = new ReentrantLock();
}
@Override
public void export(ExportEvent event) {
Metrics metrics = event.getMetrics();
if (metrics instanceof WithMetadata) {
MetricsMetaInfo meta = ((WithMetadata) metrics).getMeta();
if (subscriptionList.size() == 0 && ExportEvent.EventType.INCREMENT.equals(event.getType())) {
exportBuffer.produce(new ExportData(meta, metrics, event.getType()));
} else {
subscriptionList.forEach(subscriptionMetric -> {
if (subscriptionMetric.getMetricName().equals(meta.getMetricsName()) &&
eventTypeMatch(event.getType(), subscriptionMetric.getEventType())) {
exportBuffer.produce(new ExportData(meta, metrics, event.getType()));
}
});
}
fetchSubscriptionList();
}
}
@Override
public boolean isEnabled() {
return setting.isEnableGRPCMetrics();
}
/**
* Read the subscription list.
*/
public void fetchSubscriptionList() {
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - lastFetchTimestamp > FETCH_SUBSCRIPTION_PERIOD) {
fetchListLock.lock();
try {
if (currentTimeMillis - lastFetchTimestamp > FETCH_SUBSCRIPTION_PERIOD) {
lastFetchTimestamp = currentTimeMillis;
SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.subscription(SubscriptionReq.newBuilder().build());
subscriptionList = subscription.getMetricsList();
log.debug("Get exporter subscription list, {}", subscriptionList);
}
} catch (Throwable e) {
log.error("Getting exporter subscription list fails.", e);
} finally {
fetchListLock.unlock();
}
}
}
@Override
public void consume(List<ExportData> data) {
if (CollectionUtils.isNotEmpty(data)) {
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
log.error("Export metrics to {}:{} fails.",
setting.getGRPCTargetHost(),
setting.getGRPCTargetPort(), throwable
);
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
Metrics metrics = row.getMetrics();
if (metrics instanceof LongValueHolder) {
long value = ((LongValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof IntValueHolder) {
long value = ((IntValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof LabeledValueHolder) {
DataTable values = ((LabeledValueHolder) metrics).getValue();
values.keys().forEach(key -> {
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(values.get(key));
DataLabel labels = new DataLabel();
labels.put(key);
labels.forEach((labelName, LabelValue) -> {
KeyValue.Builder kvBuilder = KeyValue.newBuilder();
kvBuilder.setKey(labelName);
kvBuilder.setValue(LabelValue);
valueBuilder.addLabels(kvBuilder);
});
builder.addMetricValues(valueBuilder);
});
} else {
return;
}
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
builder.setEventType(
ExportEvent.EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());
builder.setTimeBucket(metrics.getTimeBucket());
streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});
streamObserver.onCompleted();
long sleepTime = 0;
long cycle = 100L;
//For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}
if (sleepTime > 2000L) {
log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(),
setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
cycle = 2000L;
}
}
log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
}
fetchSubscriptionList();
}
@Override
public void onError(List<ExportData> data, Throwable t) {
log.error(t.getMessage(), t);
}
private boolean eventTypeMatch(ExportEvent.EventType eventType,
org.apache.skywalking.oap.server.exporter.grpc.EventType subscriptionType) {
return (ExportEvent.EventType.INCREMENT.equals(eventType) && EventType.INCREMENT.equals(subscriptionType))
|| (ExportEvent.EventType.TOTAL.equals(eventType) && EventType.TOTAL.equals(subscriptionType));
}
}