blob: accb888506abceb6c9fe660cfb399447774c42bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.raft.jraft.util;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import com.codahale.metrics.MetricRegistry;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.util.concurrent.MpscSingleThreadExecutor;
import org.jetbrains.annotations.Nullable;
import static java.lang.Runtime.getRuntime;
/**
* Helper methods for jraft.
*/
public final class Utils {
private static final IgniteLogger LOG = IgniteLogger.forClass(Utils.class);
/**
* The configured number of available processors. The default is {@link Runtime#availableProcessors()}. This can be
* overridden by setting the system property "jraft.available_processors".
*/
private static final int CPUS = SystemPropertyUtil.getInt(
"jraft.available_processors", getRuntime().availableProcessors());
/**
* Default jraft closure executor pool minimum size, CPUs by default.
*/
public static final int MIN_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.closure.threadpool.size.min", cpus());
/**
* Default jraft closure executor pool maximum size.
*/
public static final int MAX_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.closure.threadpool.size.max", Math.max(100, cpus() * 5));
/**
* Default jraft append-entries executor(send) pool size.
*/
public static final int APPEND_ENTRIES_THREADS_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.append.entries.threads.send", Math.max(16, Ints.findNextPositivePowerOfTwo(cpus() * 2)));
/**
* Default jraft max pending tasks of append-entries per thread, 65536 by default.
*/
public static final int MAX_APPEND_ENTRIES_TASKS_PER_THREAD = SystemPropertyUtil.getInt(
"jraft.max.append.entries.tasks.per.thread", 32768);
/**
* Whether use {@link MpscSingleThreadExecutor}, true by default.
*/
public static final boolean USE_MPSC_SINGLE_THREAD_EXECUTOR = SystemPropertyUtil.getBoolean(
"jraft.use.mpsc.single.thread.executor", true);
private static final Pattern GROUP_ID_PATTER = Pattern.compile("^[0-9a-zA-Z][a-zA-Z0-9\\-_]*$");
public static void verifyGroupId(final String groupId) {
if (StringUtils.isBlank(groupId)) {
throw new IllegalArgumentException("Blank groupId");
}
if (!GROUP_ID_PATTER.matcher(groupId).matches()) {
throw new IllegalArgumentException(
"Invalid group id, it should be started with number or character 'a'-'z' or 'A'-'Z',"
+ "and followed with numbers, english alphabet, '-' or '_'. ");
}
}
/**
* Register an executor into metric reg.
*
* @param name The name.
* @param reg The registry.
* @param executor The executor.
*/
public static void registerClosureExecutorMetrics(String name, final MetricRegistry reg,
ThreadPoolExecutor executor) {
reg.register(name, new ThreadPoolMetricSet(executor));
}
/**
* Run closure with OK status in thread pool.
*/
public static Future<?> runClosureInThread(ExecutorService executor, final Closure done) {
if (done == null) {
return null;
}
return runClosureInThread(executor, done, Status.OK());
}
/**
* Run a task in thread pool,returns the future object. TODO asch confusion runInThread runInExecutor IGNITE-14382
*/
public static Future<?> runInThread(final ExecutorService executor, final Runnable runnable) {
return executor.submit(runnable);
}
/**
* Run a task in thread pool,returns the future object.
*/
public static void runInThread(final Executor executor, final Runnable runnable) {
executor.execute(runnable);
}
/**
* Run a callable in thread pool, returns the future object.
*/
public static <V> Future<V> runInThread(final ExecutorService executor, final Callable<V> runnable) {
return executor.submit(runnable);
}
/**
* Run closure with status in thread pool.
*/
@SuppressWarnings("Convert2Lambda")
public static Future<?> runClosureInThread(final ExecutorService executor, final Closure done,
final Status status) {
if (done == null) {
return null;
}
return runInThread(executor, new Runnable() {
@Override public void run() {
try {
done.run(status);
}
catch (final Throwable t) {
LOG.error("Fail to run done closure", t);
}
}
});
}
/**
* Run closure with status in specified executor
*/
public static void runClosureInExecutor(final Executor executor, final Closure done, final Status status) {
assert executor != null;
if (done == null) {
return;
}
executor.execute(() -> {
try {
done.run(status);
}
catch (final Throwable t) {
LOG.error("Fail to run done closure.", t);
}
});
}
/**
* Run closure with status in specified executor
*/
public static void runClosureInExecutor(final Executor executor, final Closure done) {
runClosureInExecutor(executor, done, Status.OK());
}
/**
* Close a closeable.
*/
public static int closeQuietly(final Closeable closeable) {
if (closeable == null) {
return 0;
}
try {
closeable.close();
return 0;
}
catch (final IOException e) {
LOG.error("Fail to close", e);
return RaftError.EIO.getNumber();
}
}
/**
* Get system CPUs count.
*/
public static int cpus() {
return CPUS;
}
/**
* Get java process id.
*/
public static long getProcessId(final long fallback) {
// Note: may fail in some JVM implementations
// therefore fallback has to be provided
// something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
final int index = jvmName.indexOf('@');
if (index < 1) {
// part before '@' empty (index = 0) / '@' not found (index = -1)
return fallback;
}
try {
return Long.parseLong(jvmName.substring(0, index));
}
catch (final NumberFormatException e) {
// ignore
}
return fallback;
}
/**
* Default init and expand buffer size, it can be set by -Djraft.byte_buf.size=n, default 1024.
*/
public static final int RAFT_DATA_BUF_SIZE = SystemPropertyUtil.getInt("jraft.byte_buf.size", 1024);
/**
* Default max {@link ByteBufferCollector} size per thread for recycle, it can be set by
* -Djraft.max_collector_size_per_thread, default 256
*/
public static final int MAX_COLLECTOR_SIZE_PER_THREAD = SystemPropertyUtil.getInt(
"jraft.max_collector_size_per_thread", 256);
/**
* Expand byte buffer for 1024 bytes.
*/
public static ByteBuffer expandByteBuffer(final ByteBuffer buf) {
return expandByteBufferAtLeast(buf, RAFT_DATA_BUF_SIZE);
}
/**
* Allocate a byte buffer with size.
*/
public static ByteBuffer allocate(final int size) {
return ByteBuffer.allocate(size);
}
/**
* Allocate a byte buffer with {@link #RAFT_DATA_BUF_SIZE}
*/
public static ByteBuffer allocate() {
return allocate(RAFT_DATA_BUF_SIZE);
}
/**
* Expand byte buffer at least minLength.
*/
public static ByteBuffer expandByteBufferAtLeast(final ByteBuffer buf, final int minLength) {
final int newCapacity = minLength > RAFT_DATA_BUF_SIZE ? minLength : RAFT_DATA_BUF_SIZE;
final ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() + newCapacity);
buf.flip();
newBuf.put(buf);
return newBuf;
}
/**
* Expand byte buffer at most maxLength.
*/
public static ByteBuffer expandByteBufferAtMost(final ByteBuffer buf, final int maxLength) {
final int newCapacity = maxLength > RAFT_DATA_BUF_SIZE || maxLength <= 0 ? RAFT_DATA_BUF_SIZE : maxLength;
final ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() + newCapacity);
buf.flip();
newBuf.put(buf);
return newBuf;
}
/**
* ANY IP address 0.0.0.0
*/
public static final String IP_ANY = "0.0.0.0";
/**
* Gets the current monotonic time in milliseconds.
*/
public static long monotonicMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
/**
* Returns the current time in milliseconds, it's not monotonic, would be forwarded/backward by clock synchronous.
*/
public static long nowMs() {
return System.currentTimeMillis();
}
/**
* Gets the current monotonic time in microseconds.
*/
public static long monotonicUs() {
return TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
}
/**
* Get string bytes in UTF-8 charset.
*/
public static byte[] getBytes(final String s) {
return s.getBytes(StandardCharsets.UTF_8);
}
public static <T> T withLockObject(final T obj) {
return Requires.requireNonNull(obj, "obj");
}
@SuppressWarnings("ConstantConditions")
public static boolean atomicMoveFile(final File source, final File target, final boolean sync) throws IOException {
// Move temp file to target path atomically.
// The code comes from
// https://github.com/jenkinsci/jenkins/blob/master/core/src/main/java/hudson/util/AtomicFileWriter.java#L187
Requires.requireNonNull(source, "source");
Requires.requireNonNull(target, "target");
final Path sourcePath = source.toPath();
final Path targetPath = target.toPath();
boolean success;
try {
success = Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null;
}
catch (final IOException e) {
// If it falls here that can mean many things. Either that the atomic move is not supported,
// or something wrong happened. Anyway, let's try to be over-diagnosing
if (e instanceof AtomicMoveNotSupportedException) {
LOG.warn("Atomic move not supported. falling back to non-atomic move, error: {}.", e.getMessage());
}
else {
LOG.warn("Unable to move atomically, falling back to non-atomic move, error: {}.", e.getMessage());
}
if (target.exists()) {
LOG.info("The target file {} was already existing.", targetPath);
}
try {
success = Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null;
}
catch (final IOException e1) {
e1.addSuppressed(e);
LOG.warn("Unable to move {} to {}. Attempting to delete {} and abandoning.", sourcePath, targetPath,
sourcePath);
try {
Files.deleteIfExists(sourcePath);
}
catch (final IOException e2) {
e2.addSuppressed(e1);
LOG.warn("Unable to delete {}, good bye then!", sourcePath);
throw e2;
}
throw e1;
}
}
if (success && sync) {
File dir = target.getParentFile();
// fsync on target parent dir.
fsync(dir);
}
return success;
}
/**
* Calls fsync on a file or directory.
*
* @param file file or directory
* @throws IOException if an I/O error occurs
*/
public static void fsync(final File file) throws IOException {
final boolean isDir = file.isDirectory();
// can't fsync on windowns.
if (isDir && Platform.isWindows()) {
// LOG.warn("Unable to fsync directory {} on windows.", file);
return;
}
try (final FileChannel fc = FileChannel.open(file.toPath(), isDir ? StandardOpenOption.READ
: StandardOpenOption.WRITE)) {
fc.force(true);
}
}
public static String getString(final byte[] bs, final int off, final int len) {
return new String(bs, off, len, StandardCharsets.UTF_8);
}
public static final String IPV6_START_MARK = "[";
public static final String IPV6_END_MARK = "]";
private static final int IPV6_ADDRESS_LENGTH = 16;
/**
* check whether the ip address is IPv6.
*
* @param addr ip address
* @return boolean
*/
public static boolean isIPv6(String addr) {
try {
return InetAddress.getByName(addr).getAddress().length == IPV6_ADDRESS_LENGTH;
}
catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
/**
* Parse peerId from string that generated by {@link #toString()} This method can support parameter string values
* are below:
*
* <pre>
* PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)
* PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)
* PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")
* PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
* </pre>
*/
public static String[] parsePeerId(String s) {
if (s.startsWith(IPV6_START_MARK) && StringUtils.containsIgnoreCase(s, IPV6_END_MARK)) {
String ipv6Addr;
if (s.endsWith(IPV6_END_MARK)) {
ipv6Addr = s;
}
else {
ipv6Addr = s.substring(0, (s.indexOf(IPV6_END_MARK) + 1));
}
if (!isIPv6(ipv6Addr)) {
throw new IllegalArgumentException("The IPv6 address(\"" + ipv6Addr + "\") is incorrect.");
}
String tempString = s.substring((s.indexOf(ipv6Addr) + ipv6Addr.length()));
if (tempString.startsWith(":")) {
tempString = tempString.substring(1);
}
String[] tempArr = StringUtils.splitPreserveAllTokens(tempString, ':');
String[] result = new String[1 + tempArr.length];
result[0] = ipv6Addr;
System.arraycopy(tempArr, 0, result, 1, tempArr.length);
return result;
}
else {
return StringUtils.splitPreserveAllTokens(s, ':');
}
}
public static boolean mkdir(File file) {
if (file.exists() && file.isDirectory())
return true;
return file.mkdirs();
}
/**
* Returns the size of a given collection or {@code 0} if the given collection is {@code null}.
*
* @param col collection to get the size of
* @return size of {@code col} or {@code 0} if {@code col} is {@code null}
*/
public static int size(@Nullable Collection<?> col) {
return col == null ? 0 : col.size();
}
}