| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.hadoop.ozone.freon; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos; |
| import org.apache.hadoop.hdds.scm.pipeline.Pipeline; |
| import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; |
| import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider; |
| import org.apache.hadoop.hdds.tracing.TracingUtil; |
| import org.apache.hadoop.ipc.ProtobufRpcEngine; |
| import org.apache.hadoop.ipc.RPC; |
| import org.apache.hadoop.ozone.client.OzoneClient; |
| import org.apache.hadoop.ozone.client.OzoneClientFactory; |
| import org.apache.hadoop.ozone.client.OzoneVolume; |
| import org.apache.hadoop.ozone.om.exceptions.OMException; |
| import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; |
| import org.apache.hadoop.ozone.om.protocolPB.OmTransport; |
| import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory; |
| import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; |
| import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; |
| import org.apache.hadoop.security.UserGroupInformation; |
| |
| import com.codahale.metrics.ConsoleReporter; |
| import com.codahale.metrics.MetricRegistry; |
| import com.codahale.metrics.ScheduledReporter; |
| import com.codahale.metrics.Slf4jReporter; |
| import io.opentracing.Scope; |
| import io.opentracing.Span; |
| import io.opentracing.util.GlobalTracer; |
| import org.apache.commons.codec.digest.DigestUtils; |
| import org.apache.commons.lang3.RandomStringUtils; |
| import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; |
| import org.apache.ratis.protocol.ClientId; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import picocli.CommandLine.Option; |
| import picocli.CommandLine.ParentCommand; |
| |
| /** |
| * Base class for simplified performance tests. |
| */ |
| public class BaseFreonGenerator { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(BaseFreonGenerator.class); |
| |
| private static final int CHECK_INTERVAL_MILLIS = 1000; |
| |
| private static final String DIGEST_ALGORITHM = "MD5"; |
| |
| private static final Pattern ENV_VARIABLE_IN_PATTERN = |
| Pattern.compile("__(.+?)__"); |
| |
| @ParentCommand |
| private Freon freonCommand; |
| |
| @Option(names = {"-n", "--number-of-tests"}, |
| description = "Number of the generated objects.", |
| defaultValue = "1000") |
| private long testNo = 1000; |
| |
| @Option(names = {"-t", "--threads", "--thread"}, |
| description = "Number of threads used to execute", |
| defaultValue = "10") |
| private int threadNo; |
| |
| @Option(names = {"-f", "--fail-at-end"}, |
| description = "If turned on, all the tasks will be executed even if " |
| + "there are failures.") |
| private boolean failAtEnd; |
| |
| @Option(names = {"-p", "--prefix"}, |
| description = "Unique identifier of the test execution. Usually used as" |
| + " a prefix of the generated object names. If empty, a random name" |
| + " will be generated", |
| defaultValue = "") |
| private String prefix = ""; |
| |
| private MetricRegistry metrics = new MetricRegistry(); |
| |
| private AtomicLong successCounter; |
| private AtomicLong failureCounter; |
| private AtomicLong attemptCounter; |
| |
| private long startTime; |
| |
| private PathSchema pathSchema; |
| private String spanName; |
| private ExecutorService executor; |
| private ProgressBar progressBar; |
| |
| /** |
| * The main logic to execute a test generator. |
| * |
| * @param provider creates the new steps to execute. |
| */ |
| public void runTests(TaskProvider provider) { |
| setup(provider); |
| startTaskRunners(provider); |
| waitForCompletion(); |
| shutdown(); |
| reportAnyFailure(); |
| } |
| |
| /** |
| * Performs {@code provider}-specific initialization. |
| */ |
| private void setup(TaskProvider provider) { |
| //provider is usually a lambda, print out only the owner class name: |
| spanName = provider.getClass().getSimpleName().split("\\$")[0]; |
| } |
| |
| /** |
| * Launches {@code threadNo} task runners in executor. Each one executes test |
| * tasks in a loop until completion or failure. |
| */ |
| private void startTaskRunners(TaskProvider provider) { |
| for (int i = 0; i < threadNo; i++) { |
| executor.execute(() -> taskLoop(provider)); |
| } |
| } |
| |
| /** |
| * Runs test tasks in a loop until completion or failure. This is executed |
| * concurrently in {@code executor}. |
| */ |
| private void taskLoop(TaskProvider provider) { |
| while (true) { |
| long counter = attemptCounter.getAndIncrement(); |
| |
| //in case of an other failed test, we shouldn't execute more tasks. |
| if (counter >= testNo || (!failAtEnd && failureCounter.get() > 0)) { |
| break; |
| } |
| |
| tryNextTask(provider, counter); |
| } |
| |
| taskLoopCompleted(); |
| } |
| |
| /** |
| * Provides a way to clean up per-thread resources. |
| */ |
| protected void taskLoopCompleted() { |
| // no-op |
| } |
| |
| /** |
| * Runs a single test task (eg. key creation). |
| * @param taskId unique ID of the task |
| */ |
| private void tryNextTask(TaskProvider provider, long taskId) { |
| Span span = GlobalTracer.get().buildSpan(spanName).start(); |
| try (Scope scope = GlobalTracer.get().activateSpan(span)) { |
| provider.executeNextTask(taskId); |
| successCounter.incrementAndGet(); |
| } catch (Exception e) { |
| span.setTag("failure", true); |
| failureCounter.incrementAndGet(); |
| LOG.error("Error on executing task {}", taskId, e); |
| } finally { |
| span.finish(); |
| } |
| } |
| |
| /** |
| * Waits until the requested number of tests are executed, or until any |
| * failure in early failure mode (the default). This is run in the main |
| * thread. |
| */ |
| private void waitForCompletion() { |
| while (successCounter.get() + failureCounter.get() < testNo && ( |
| failureCounter.get() == 0 || failAtEnd)) { |
| try { |
| Thread.sleep(CHECK_INTERVAL_MILLIS); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| private void shutdown() { |
| if (failureCounter.get() > 0 && !failAtEnd) { |
| progressBar.terminate(); |
| } else { |
| progressBar.shutdown(); |
| } |
| executor.shutdown(); |
| try { |
| executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| } |
| } |
| |
| /** |
| * @throws RuntimeException if any tests failed |
| */ |
| private void reportAnyFailure() { |
| if (failureCounter.get() > 0) { |
| throw new RuntimeException("One ore more freon test is failed."); |
| } |
| } |
| |
| /** |
| * Initialize internal counters, and variables. Call it before runTests. |
| */ |
| public void init() { |
| |
| freonCommand.startHttpServer(); |
| |
| successCounter = new AtomicLong(0); |
| failureCounter = new AtomicLong(0); |
| attemptCounter = new AtomicLong(0); |
| |
| if (prefix.length() == 0) { |
| prefix = RandomStringUtils.randomAlphanumeric(10).toLowerCase(); |
| } else { |
| //replace environment variables to support multi-node execution |
| prefix = resolvePrefix(prefix); |
| } |
| LOG.info("Executing test with prefix {}", prefix); |
| |
| pathSchema = new PathSchema(prefix); |
| |
| Runtime.getRuntime().addShutdownHook( |
| new Thread(() -> { |
| try { |
| freonCommand.stopHttpServer(); |
| } catch (Exception ex) { |
| LOG.error("HTTP server can't be stopped.", ex); |
| } |
| printReport(); |
| })); |
| |
| executor = Executors.newFixedThreadPool(threadNo); |
| |
| progressBar = new ProgressBar(System.out, testNo, successCounter::get, |
| freonCommand.isInteractive()); |
| progressBar.start(); |
| |
| startTime = System.currentTimeMillis(); |
| } |
| |
| /** |
| * Resolve environment variables in the prefixes. |
| */ |
| public String resolvePrefix(String inputPrefix) { |
| Matcher m = ENV_VARIABLE_IN_PATTERN.matcher(inputPrefix); |
| StringBuffer sb = new StringBuffer(); |
| while (m.find()) { |
| String environment = System.getenv(m.group(1)); |
| m.appendReplacement(sb, environment != null ? environment : ""); |
| } |
| m.appendTail(sb); |
| return sb.toString(); |
| } |
| |
| /** |
| * Print out reports from the executed tests. |
| */ |
| public void printReport() { |
| ScheduledReporter reporter = freonCommand.isInteractive() |
| ? ConsoleReporter.forRegistry(metrics).build() |
| : Slf4jReporter.forRegistry(metrics).build(); |
| reporter.report(); |
| |
| List<String> messages = new LinkedList<>(); |
| messages.add("Total execution time (sec): " + |
| Math.round((System.currentTimeMillis() - startTime) / 1000.0)); |
| messages.add("Failures: " + failureCounter.get()); |
| messages.add("Successful executions: " + successCounter.get()); |
| |
| Consumer<String> print = freonCommand.isInteractive() |
| ? System.out::println |
| : LOG::info; |
| messages.forEach(print); |
| } |
| |
| /** |
| * Print out reports with the given message. |
| */ |
| public void print(String msg){ |
| Consumer<String> print = freonCommand.isInteractive() |
| ? System.out::println |
| : LOG::info; |
| print.accept(msg); |
| } |
| |
| /** |
| * Create the OM RPC client to use it for testing. |
| */ |
| public OzoneManagerProtocolClientSideTranslatorPB createOmClient( |
| OzoneConfiguration conf, String omServiceID) throws IOException { |
| UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); |
| RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class, |
| ProtobufRpcEngine.class); |
| String clientId = ClientId.randomId().toString(); |
| |
| if (omServiceID == null) { |
| |
| //if only one serviceId is configured, use that |
| final String[] configuredServiceIds = |
| conf.getTrimmedStrings(OZONE_OM_SERVICE_IDS_KEY); |
| if (configuredServiceIds.length == 1) { |
| omServiceID = configuredServiceIds[0]; |
| } |
| } |
| |
| OmTransport transport = OmTransportFactory.create(conf, ugi, omServiceID); |
| return new OzoneManagerProtocolClientSideTranslatorPB(transport, clientId); |
| } |
| |
| public StorageContainerLocationProtocol createStorageContainerLocationClient( |
| OzoneConfiguration ozoneConf) { |
| SCMContainerLocationFailoverProxyProvider proxyProvider = |
| new SCMContainerLocationFailoverProxyProvider(ozoneConf); |
| StorageContainerLocationProtocol client = |
| TracingUtil.createProxy( |
| new StorageContainerLocationProtocolClientSideTranslatorPB( |
| proxyProvider), |
| StorageContainerLocationProtocol.class, ozoneConf); |
| return client; |
| } |
| |
| public static Pipeline findPipelineForTest(String pipelineId, |
| StorageContainerLocationProtocol client, Logger log) throws IOException { |
| List<Pipeline> pipelines = client.listPipelines(); |
| Pipeline pipeline; |
| if (pipelineId != null && pipelineId.length() > 0) { |
| pipeline = pipelines.stream() |
| .filter(p -> p.getId().toString().equals(pipelineId)) |
| .findFirst() |
| .orElseThrow(() -> new IllegalArgumentException( |
| "Pipeline ID is defined, but there is no such pipeline: " |
| + pipelineId)); |
| } else { |
| pipeline = pipelines.stream() |
| .filter(p -> p.getFactor() == HddsProtos.ReplicationFactor.THREE) |
| .findFirst() |
| .orElseThrow(() -> new IllegalArgumentException( |
| "Pipeline ID is NOT defined, and no pipeline " + |
| "has been found with factor=THREE")); |
| log.info("Using pipeline {}", pipeline.getId()); |
| } |
| return pipeline; |
| } |
| |
| /** |
| * Generate a key/file name based on the prefix and counter. |
| */ |
| public String generateObjectName(long counter) { |
| return pathSchema.getPath(counter); |
| } |
| |
| /** |
| * Create missing target volume/bucket. |
| */ |
| public void ensureVolumeAndBucketExist(OzoneClient rpcClient, |
| String volumeName, String bucketName) throws IOException { |
| |
| OzoneVolume volume; |
| ensureVolumeExists(rpcClient, volumeName); |
| volume = rpcClient.getObjectStore().getVolume(volumeName); |
| |
| try { |
| volume.getBucket(bucketName); |
| } catch (OMException ex) { |
| if (ex.getResult() == ResultCodes.BUCKET_NOT_FOUND) { |
| volume.createBucket(bucketName); |
| } else { |
| throw ex; |
| } |
| } |
| |
| } |
| |
| /** |
| * Create missing target volume. |
| */ |
| public void ensureVolumeExists( |
| OzoneClient rpcClient, |
| String volumeName) throws IOException { |
| try { |
| rpcClient.getObjectStore().getVolume(volumeName); |
| } catch (OMException ex) { |
| if (ex.getResult() == ResultCodes.VOLUME_NOT_FOUND) { |
| rpcClient.getObjectStore().createVolume(volumeName); |
| } else { |
| throw ex; |
| } |
| } |
| } |
| |
| /** |
| * Calculate checksum of a byte array. |
| */ |
| public static byte[] getDigest(byte[] content) { |
| DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM); |
| dig.getMessageDigest().reset(); |
| return dig.digest(content); |
| } |
| |
| /** |
| * Calculate checksum of an Input stream. |
| */ |
| public static byte[] getDigest(InputStream stream) throws IOException { |
| DigestUtils dig = new DigestUtils(DIGEST_ALGORITHM); |
| dig.getMessageDigest().reset(); |
| return dig.digest(stream); |
| } |
| |
| public String getPrefix() { |
| return prefix; |
| } |
| |
| public MetricRegistry getMetrics() { |
| return metrics; |
| } |
| |
| public OzoneConfiguration createOzoneConfiguration() { |
| return freonCommand.createOzoneConfiguration(); |
| } |
| |
| /** |
| * Simple contract to execute a new step during a freon test. |
| */ |
| @FunctionalInterface |
| public interface TaskProvider { |
| void executeNextTask(long step) throws Exception; |
| } |
| |
| public AtomicLong getAttemptCounter() { |
| return attemptCounter; |
| } |
| |
| public int getThreadNo() { |
| return threadNo; |
| } |
| |
| public void setThreadNo(int threadNo) { |
| this.threadNo = threadNo; |
| } |
| |
| protected OzoneClient createOzoneClient(String omServiceID, |
| OzoneConfiguration conf) throws Exception { |
| if (omServiceID != null) { |
| return OzoneClientFactory.getRpcClient(omServiceID, conf); |
| } else { |
| return OzoneClientFactory.getRpcClient(conf); |
| } |
| } |
| |
| public void setTestNo(long testNo) { |
| this.testNo = testNo; |
| } |
| |
| public long getTestNo() { |
| return testNo; |
| } |
| } |