| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.core.storage; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.skywalking.oap.server.core.CoreModuleConfig; |
| import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; |
| import org.apache.skywalking.oap.server.core.analysis.worker.PersistenceWorker; |
| import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor; |
| import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; |
| import org.apache.skywalking.oap.server.library.module.ModuleManager; |
| import org.apache.skywalking.oap.server.library.util.CollectionUtils; |
| import org.apache.skywalking.oap.server.library.util.RunnableWithExceptionProtection; |
| import org.apache.skywalking.oap.server.telemetry.TelemetryModule; |
| import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; |
| import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; |
| import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; |
| import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; |
| |
| @Slf4j |
| public enum PersistenceTimer { |
| INSTANCE; |
| @VisibleForTesting |
| boolean isStarted = false; |
| private CounterMetrics errorCounter; |
| private HistogramMetrics prepareLatency; |
| private HistogramMetrics executeLatency; |
| private HistogramMetrics allLatency; |
| private ExecutorService prepareExecutorService; |
| |
| PersistenceTimer() { |
| } |
| |
| public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) { |
| log.info("persistence timer start"); |
| IBatchDAO batchDAO = |
| moduleManager.find(StorageModule.NAME).provider().getService(IBatchDAO.class); |
| |
| MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) |
| .provider() |
| .getService(MetricsCreator.class); |
| errorCounter = metricsCreator.createCounter( |
| "persistence_timer_bulk_error_count", |
| "Error execution of the prepare stage in persistence timer", |
| MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE |
| ); |
| prepareLatency = metricsCreator.createHistogramMetric( |
| "persistence_timer_bulk_prepare_latency", |
| "Latency of the prepare stage in persistence timer", |
| MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE |
| ); |
| executeLatency = metricsCreator.createHistogramMetric( |
| "persistence_timer_bulk_execute_latency", |
| "Latency of the execute stage in persistence timer", |
| MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE |
| ); |
| allLatency = metricsCreator.createHistogramMetric( |
| "persistence_timer_bulk_all_latency", "Latency of the all stage in persistence timer", |
| MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE |
| ); |
| |
| prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads()); |
| if (!isStarted) { |
| Executors.newSingleThreadScheduledExecutor() |
| .scheduleWithFixedDelay( |
| new RunnableWithExceptionProtection( |
| () -> extractDataAndSave(batchDAO).join(), |
| t -> log.error("Extract data and save failure.", t) |
| ), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS |
| ); |
| |
| this.isStarted = true; |
| } |
| } |
| |
| private CompletableFuture<Void> extractDataAndSave(IBatchDAO batchDAO) { |
| if (log.isDebugEnabled()) { |
| log.debug("Extract data and save"); |
| } |
| |
| long startTime = System.currentTimeMillis(); |
| |
| HistogramMetrics.Timer allTimer = allLatency.createTimer(); |
| List<PersistenceWorker<? extends StorageData>> workers = new ArrayList<>(); |
| workers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); |
| workers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); |
| |
| final CompletableFuture<Void> future = |
| CompletableFuture.allOf(workers.stream().map(worker -> { |
| return CompletableFuture.runAsync(() -> { |
| List<PrepareRequest> innerPrepareRequests; |
| // Prepare stage |
| try (HistogramMetrics.Timer ignored = prepareLatency.createTimer()) { |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "extract {} worker data and save", |
| worker.getClass().getName() |
| ); |
| } |
| |
| innerPrepareRequests = worker.buildBatchRequests(); |
| |
| worker.endOfRound(); |
| } |
| |
| if (CollectionUtils.isEmpty(innerPrepareRequests)) { |
| return; |
| } |
| |
| // Execution stage |
| HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); |
| batchDAO.flush(innerPrepareRequests) |
| .whenComplete(($1, $2) -> executeLatencyTimer.close()); |
| }, prepareExecutorService); |
| }).toArray(CompletableFuture[]::new)); |
| |
| future.whenComplete((unused, throwable) -> { |
| batchDAO.endOfFlush(); |
| allTimer.close(); |
| if (log.isDebugEnabled()) { |
| log.debug( |
| "Batch persistence duration: {} ms", |
| System.currentTimeMillis() - startTime |
| ); |
| } |
| if (throwable != null) { |
| errorCounter.inc(); |
| log.error(throwable.getMessage(), throwable); |
| } |
| }); |
| return future; |
| } |
| } |