blob: 2a371938eb45e43d0528b810c0979be172a8bcfe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.remote;
import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is Server-side streaming RPC implementation. It's a common service for OAP servers to receive message from
* each others. The stream data id is used to find the object to deserialize message. The next worker id is used to find
* the worker to process message.
*/
public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteServiceHandler.class);
private final ModuleDefineHolder moduleDefineHolder;
private IWorkerInstanceGetter workerInstanceGetter;
private CounterMetrics remoteInCounter;
private CounterMetrics remoteInErrorCounter;
private CounterMetrics remoteInTargetNotFoundCounter;
private HistogramMetrics remoteInHistogram;
public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
remoteInCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter(
"remote_in_count",
"The number(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInErrorCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter(
"remote_in_error_count",
"The error number(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInTargetNotFoundCounter = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createCounter(
"remote_in_target_not_found_count",
"The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
remoteInHistogram = moduleDefineHolder.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createHistogramMetric(
"remote_in_latency",
"The latency(server side) of inside remote inside aggregate rpc.",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
}
/**
* gRPC handler of {@link RemoteServiceGrpc}. Continue the distributed aggregation at the current OAP node.
*/
@Override
public StreamObserver<RemoteMessage> call(StreamObserver<Empty> responseObserver) {
if (Objects.isNull(workerInstanceGetter)) {
synchronized (RemoteServiceHandler.class) {
if (Objects.isNull(workerInstanceGetter)) {
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME)
.provider()
.getService(IWorkerInstanceGetter.class);
}
}
}
return new StreamObserver<RemoteMessage>() {
@Override
public void onNext(RemoteMessage message) {
remoteInCounter.inc();
HistogramMetrics.Timer timer = remoteInHistogram.createTimer();
try {
String nextWorkerName = message.getNextWorkerName();
RemoteData remoteData = message.getRemoteData();
try {
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
if (handleWorker != null) {
AbstractWorker nextWorker = handleWorker.getWorker();
StreamData streamData = handleWorker.getStreamDataClass().newInstance();
streamData.deserialize(remoteData);
nextWorker.in(streamData);
} else {
remoteInTargetNotFoundCounter.inc();
LOGGER.warn(
"Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.",
nextWorkerName
);
}
} catch (Throwable t) {
remoteInErrorCounter.inc();
LOGGER.error(t.getMessage(), t);
}
} finally {
timer.finish();
}
}
@Override
public void onError(Throwable throwable) {
LOGGER.error(throwable.getMessage(), throwable);
}
@Override
public void onCompleted() {
responseObserver.onNext(Empty.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}