| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.receiver.ebpf.provider.handler; |
| |
| import com.google.common.base.Joiner; |
| import io.grpc.stub.StreamObserver; |
| import io.vavr.Tuple; |
| import io.vavr.Tuple2; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.skywalking.apm.network.common.v3.Commands; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFOffCPUProfiling; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFOnCPUProfiling; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingServiceGrpc; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingStackMetadata; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskMetadata; |
| import org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingTaskQuery; |
| import org.apache.skywalking.oap.server.core.CoreModule; |
| import org.apache.skywalking.oap.server.core.command.CommandService; |
| import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingStackType; |
| import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType; |
| import org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus; |
| import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask; |
| import org.apache.skywalking.oap.server.core.query.type.Process; |
| import org.apache.skywalking.oap.server.core.source.EBPFProfilingData; |
| import org.apache.skywalking.oap.server.core.source.EBPFProcessProfilingSchedule; |
| import org.apache.skywalking.oap.server.core.source.SourceReceiver; |
| import org.apache.skywalking.oap.server.core.storage.StorageModule; |
| import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO; |
| import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; |
| import org.apache.skywalking.oap.server.library.module.ModuleManager; |
| import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler; |
| import org.apache.skywalking.oap.server.library.util.CollectionUtils; |
| import org.apache.skywalking.oap.server.network.trace.component.command.EBPFProfilingTaskCommand; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| /** |
| * Handle the eBPF Profiling data request from the eBPF Agent side. |
| */ |
| @Slf4j |
| public class EBPFProfilingServiceHandler extends EBPFProfilingServiceGrpc.EBPFProfilingServiceImplBase implements GRPCHandler { |
| public static final List<EBPFProfilingStackType> COMMON_STACK_TYPE_ORDER = Arrays.asList( |
| EBPFProfilingStackType.KERNEL_SPACE, EBPFProfilingStackType.USER_SPACE); |
| |
| private IEBPFProfilingTaskDAO taskDAO; |
| private IMetadataQueryDAO metadataQueryDAO; |
| private final SourceReceiver sourceReceiver; |
| private final CommandService commandService; |
| |
| public EBPFProfilingServiceHandler(ModuleManager moduleManager) { |
| this.metadataQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetadataQueryDAO.class); |
| this.taskDAO = moduleManager.find(StorageModule.NAME).provider().getService(IEBPFProfilingTaskDAO.class); |
| this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); |
| this.commandService = moduleManager.find(CoreModule.NAME).provider().getService(CommandService.class); |
| } |
| |
| @Override |
| public void queryTasks(EBPFProfilingTaskQuery request, StreamObserver<Commands> responseObserver) { |
| String agentId = request.getRoverInstanceId(); |
| final long latestUpdateTime = request.getLatestUpdateTime(); |
| try { |
| // find exists process from agent |
| final List<Process> processes = metadataQueryDAO.listProcesses(agentId); |
| if (CollectionUtils.isEmpty(processes)) { |
| responseObserver.onNext(Commands.newBuilder().build()); |
| responseObserver.onCompleted(); |
| return; |
| } |
| |
| // fetch tasks from process id list |
| final List<String> serviceIdList = processes.stream().map(Process::getServiceId).distinct().collect(Collectors.toList()); |
| final List<EBPFProfilingTask> tasks = taskDAO.queryTasksByServices(serviceIdList, 0, latestUpdateTime); |
| |
| final Commands.Builder builder = Commands.newBuilder(); |
| tasks.stream().collect(Collectors.toMap(EBPFProfilingTask::getTaskId, Function.identity(), EBPFProfilingTask::combine)) |
| .values().stream().flatMap(t -> this.buildProfilingCommands(t, processes).stream()) |
| .map(EBPFProfilingTaskCommand::serialize).forEach(builder::addCommands); |
| responseObserver.onNext(builder.build()); |
| responseObserver.onCompleted(); |
| return; |
| } catch (Exception e) { |
| log.warn("query ebpf process profiling task failure", e); |
| } |
| responseObserver.onNext(Commands.newBuilder().build()); |
| responseObserver.onCompleted(); |
| } |
| |
| private List<EBPFProfilingTaskCommand> buildProfilingCommands(EBPFProfilingTask task, List<Process> processes) { |
| if (EBPFProfilingTargetType.NETWORK.equals(task.getTargetType())) { |
| final List<String> processIdList = processes.stream().filter(p -> Objects.equals(p.getInstanceId(), task.getServiceInstanceId())).map(Process::getId).collect(Collectors.toList()); |
| if (CollectionUtils.isEmpty(processIdList)) { |
| return Collections.emptyList(); |
| } |
| return Collections.singletonList(commandService.newEBPFProfilingTaskCommand(task, processIdList)); |
| } |
| final ArrayList<EBPFProfilingTaskCommand> commands = new ArrayList<>(processes.size()); |
| for (Process process : processes) { |
| // The service id must match between process and task and must could profiling |
| if (!Objects.equals(process.getServiceId(), task.getServiceId()) |
| || !ProfilingSupportStatus.SUPPORT_EBPF_PROFILING.name().equals(process.getProfilingSupportStatus())) { |
| continue; |
| } |
| |
| // If the task doesn't require a label or the process match all labels in task |
| if (CollectionUtils.isEmpty(task.getProcessLabels()) |
| || process.getLabels().containsAll(task.getProcessLabels())) { |
| commands.add(commandService.newEBPFProfilingTaskCommand(task, Collections.singletonList(process.getId()))); |
| } |
| } |
| return commands; |
| } |
| |
| @Override |
| public StreamObserver<org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingData> collectProfilingData(StreamObserver<Commands> responseObserver) { |
| return new StreamObserver<org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingData>() { |
| private volatile boolean isFirst = true; |
| private EBPFProfilingTaskMetadata task; |
| private String scheduleId; |
| |
| @Override |
| public void onNext(org.apache.skywalking.apm.network.ebpf.profiling.v3.EBPFProfilingData ebpfProfilingData) { |
| if (isFirst || ebpfProfilingData.hasTask()) { |
| task = ebpfProfilingData.getTask(); |
| |
| // update schedule metadata |
| final EBPFProcessProfilingSchedule schedule = new EBPFProcessProfilingSchedule(); |
| schedule.setProcessId(task.getProcessId()); |
| schedule.setTaskId(task.getTaskId()); |
| schedule.setStartTime(task.getProfilingStartTime()); |
| schedule.setCurrentTime(task.getCurrentTime()); |
| sourceReceiver.receive(schedule); |
| |
| scheduleId = schedule.getEntityId(); |
| } |
| isFirst = false; |
| |
| // add profiling data |
| final EBPFProfilingData data = new EBPFProfilingData(); |
| data.setScheduleId(scheduleId); |
| data.setTaskId(task.getTaskId()); |
| data.setUploadTime(task.getCurrentTime()); |
| switch (ebpfProfilingData.getProfilingCase()) { |
| case ONCPU: |
| try { |
| processOnCPUProfiling(data, ebpfProfilingData.getOnCPU()); |
| } catch (IOException e) { |
| log.warn("process ON_CPU profiling data failure", e); |
| } |
| break; |
| case OFFCPU: |
| try { |
| processOffCPUProfiling(data, ebpfProfilingData.getOffCPU()); |
| } catch (IOException e) { |
| log.warn("process OFF_CPU profiling data failure", e); |
| } |
| break; |
| default: |
| throw new IllegalArgumentException("the profiling data not set"); |
| } |
| |
| sourceReceiver.receive(data); |
| } |
| |
| @Override |
| public void onError(Throwable throwable) { |
| log.error("Error in receiving ebpf profiling data", throwable); |
| } |
| |
| @Override |
| public void onCompleted() { |
| responseObserver.onNext(Commands.newBuilder().build()); |
| responseObserver.onCompleted(); |
| } |
| }; |
| } |
| |
| private void processOnCPUProfiling(EBPFProfilingData data, EBPFOnCPUProfiling onCPU) throws IOException { |
| Tuple2<String, List<EBPFProfilingStackMetadata>> order = orderMetadataAndSetToData(onCPU.getStacksList(), COMMON_STACK_TYPE_ORDER); |
| data.setStackIdList(order._1); |
| data.setTargetType(EBPFProfilingTargetType.ON_CPU); |
| data.setDataBinary(EBPFOnCPUProfiling.newBuilder() |
| .addAllStacks(order._2) |
| .setDumpCount(onCPU.getDumpCount()) |
| .build().toByteArray()); |
| } |
| |
| private void processOffCPUProfiling(EBPFProfilingData data, EBPFOffCPUProfiling offCPUProfiling) throws IOException { |
| Tuple2<String, List<EBPFProfilingStackMetadata>> order = orderMetadataAndSetToData(offCPUProfiling.getStacksList(), COMMON_STACK_TYPE_ORDER); |
| data.setStackIdList(order._1); |
| data.setTargetType(EBPFProfilingTargetType.OFF_CPU); |
| data.setDataBinary(EBPFOffCPUProfiling.newBuilder() |
| .addAllStacks(order._2) |
| .setSwitchCount(offCPUProfiling.getSwitchCount()) |
| .setDuration(offCPUProfiling.getDuration()) |
| .build().toByteArray()); |
| } |
| |
| private Tuple2<String, List<EBPFProfilingStackMetadata>> orderMetadataAndSetToData(List<EBPFProfilingStackMetadata> original, |
| List<EBPFProfilingStackType> order) { |
| final HashMap<EBPFProfilingStackType, EBPFProfilingStackMetadata> tmp = new HashMap<>(); |
| original.forEach(e -> tmp.put(EBPFProfilingStackType.valueOf(e.getStackType()), e)); |
| |
| final List<Integer> stackIdList = new ArrayList<>(); |
| final ArrayList<EBPFProfilingStackMetadata> result = new ArrayList<>(); |
| for (EBPFProfilingStackType orderStack : order) { |
| final EBPFProfilingStackMetadata stack = tmp.get(orderStack); |
| if (stack != null) { |
| result.add(stack); |
| stackIdList.add(stack.getStackId()); |
| } |
| } |
| return Tuple.of(Joiner.on("_").join(stackIdList), result); |
| } |
| } |