blob: 090cc1250081894aee2280d6fa7048c19dd8f056 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.synthetic.delay;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Duration;
/** Utility functions used in {@link org.apache.beam.sdk.io.synthetic}. */
public class SyntheticDelay {
// cpu delay implementation:
private static final long MASK = (1L << 16) - 1L;
private static final long HASH = 0x243F6A8885A308D3L;
private static final long INIT_PLAINTEXT = 50000L;
/**
* Implements a mechanism to delay a thread in various fashions. * {@code CPU}: Burn CPU while
* waiting. * {@code SLEEP}: Sleep uninterruptibly while waiting. * {@code MIXED}: Switch between
* burning CPU and sleeping every millisecond to emulate a desired CPU utilization specified by
* {@code cpuUtilizationInMixedDelay}.
*
* @return Millis spent sleeping, does not include time spent spinning.
*/
public static long delay(
Duration delay,
double cpuUtilizationInMixedDelay,
SyntheticOptions.DelayType delayType,
Random rnd) {
switch (delayType) {
case CPU:
cpuDelay(delay.getMillis());
return 0;
case SLEEP:
Uninterruptibles.sleepUninterruptibly(
Math.max(0L, delay.getMillis()), TimeUnit.MILLISECONDS);
return delay.getMillis();
case MIXED:
// Mixed mode: for each millisecond of delay randomly choose to spin or sleep.
// This is enforced at millisecond granularity since that is the minimum duration that
// Thread.sleep() can sleep. Millisecond is also the unit of processing delay.
long sleepMillis = 0;
for (long i = 0; i < delay.getMillis(); i++) {
if (rnd.nextDouble() < cpuUtilizationInMixedDelay) {
delay(new Duration(1), 0.0, SyntheticOptions.DelayType.CPU, rnd);
} else {
sleepMillis += delay(new Duration(1), 0.0, SyntheticOptions.DelayType.SLEEP, rnd);
}
}
return sleepMillis;
default:
throw new IllegalArgumentException("Unknown delay type " + delayType);
}
}
/** Keep cpu busy for {@code delayMillis} by calculating lots of hashes. */
private static void cpuDelay(long delayMillis) {
// Note that the delay is enforced in terms of walltime. That implies this thread may not
// keep CPU busy if it gets preempted by other threads. There is more of chance of this
// occurring in a streaming pipeline as there could be lots of threads running this. The loop
// measures cpu time spent for each iteration, so that these effects are some what minimized.
long cpuMicros = delayMillis * 1000;
Stopwatch timer = Stopwatch.createUnstarted();
while (timer.elapsed(TimeUnit.MICROSECONDS) < cpuMicros) {
// Find a long which hashes to HASH in lowest MASK bits.
// Values chosen to roughly take 1ms on typical workstation.
timer.start();
long p = INIT_PLAINTEXT;
while (true) {
long t = Hashing.murmur3_128().hashLong(p).asLong();
if ((t & MASK) == (HASH & MASK)) {
break;
}
p++;
}
timer.stop();
}
}
}