blob: accb888506abceb6c9fe660cfb399447774c42bf [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.ignite.raft.jraft.util;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import com.codahale.metrics.MetricRegistry;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.jraft.Closure;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.util.concurrent.MpscSingleThreadExecutor;
import org.jetbrains.annotations.Nullable;
import static java.lang.Runtime.getRuntime;
* Helper methods for jraft.
public final class Utils {
private static final IgniteLogger LOG = IgniteLogger.forClass(Utils.class);
* The configured number of available processors. The default is {@link Runtime#availableProcessors()}. This can be
* overridden by setting the system property "jraft.available_processors".
private static final int CPUS = SystemPropertyUtil.getInt(
"jraft.available_processors", getRuntime().availableProcessors());
* Default jraft closure executor pool minimum size, CPUs by default.
public static final int MIN_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.closure.threadpool.size.min", cpus());
* Default jraft closure executor pool maximum size.
public static final int MAX_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.closure.threadpool.size.max", Math.max(100, cpus() * 5));
* Default jraft append-entries executor(send) pool size.
public static final int APPEND_ENTRIES_THREADS_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.append.entries.threads.send", Math.max(16, Ints.findNextPositivePowerOfTwo(cpus() * 2)));
* Default jraft max pending tasks of append-entries per thread, 65536 by default.
public static final int MAX_APPEND_ENTRIES_TASKS_PER_THREAD = SystemPropertyUtil.getInt(
"jraft.max.append.entries.tasks.per.thread", 32768);
* Whether use {@link MpscSingleThreadExecutor}, true by default.
public static final boolean USE_MPSC_SINGLE_THREAD_EXECUTOR = SystemPropertyUtil.getBoolean(
"jraft.use.mpsc.single.thread.executor", true);
private static final Pattern GROUP_ID_PATTER = Pattern.compile("^[0-9a-zA-Z][a-zA-Z0-9\\-_]*$");
public static void verifyGroupId(final String groupId) {
if (StringUtils.isBlank(groupId)) {
throw new IllegalArgumentException("Blank groupId");
if (!GROUP_ID_PATTER.matcher(groupId).matches()) {
throw new IllegalArgumentException(
"Invalid group id, it should be started with number or character 'a'-'z' or 'A'-'Z',"
+ "and followed with numbers, english alphabet, '-' or '_'. ");
* Register an executor into metric reg.
* @param name The name.
* @param reg The registry.
* @param executor The executor.
public static void registerClosureExecutorMetrics(String name, final MetricRegistry reg,
ThreadPoolExecutor executor) {
reg.register(name, new ThreadPoolMetricSet(executor));
* Run closure with OK status in thread pool.
public static Future<?> runClosureInThread(ExecutorService executor, final Closure done) {
if (done == null) {
return null;
return runClosureInThread(executor, done, Status.OK());
* Run a task in thread pool,returns the future object. TODO asch confusion runInThread runInExecutor IGNITE-14382
public static Future<?> runInThread(final ExecutorService executor, final Runnable runnable) {
return executor.submit(runnable);
* Run a task in thread pool,returns the future object.
public static void runInThread(final Executor executor, final Runnable runnable) {
* Run a callable in thread pool, returns the future object.
public static <V> Future<V> runInThread(final ExecutorService executor, final Callable<V> runnable) {
return executor.submit(runnable);
* Run closure with status in thread pool.
public static Future<?> runClosureInThread(final ExecutorService executor, final Closure done,
final Status status) {
if (done == null) {
return null;
return runInThread(executor, new Runnable() {
@Override public void run() {
try {;
catch (final Throwable t) {
LOG.error("Fail to run done closure", t);
* Run closure with status in specified executor
public static void runClosureInExecutor(final Executor executor, final Closure done, final Status status) {
assert executor != null;
if (done == null) {
executor.execute(() -> {
try {;
catch (final Throwable t) {
LOG.error("Fail to run done closure.", t);
* Run closure with status in specified executor
public static void runClosureInExecutor(final Executor executor, final Closure done) {
runClosureInExecutor(executor, done, Status.OK());
* Close a closeable.
public static int closeQuietly(final Closeable closeable) {
if (closeable == null) {
return 0;
try {
return 0;
catch (final IOException e) {
LOG.error("Fail to close", e);
return RaftError.EIO.getNumber();
* Get system CPUs count.
public static int cpus() {
return CPUS;
* Get java process id.
public static long getProcessId(final long fallback) {
// Note: may fail in some JVM implementations
// therefore fallback has to be provided
// something like '<pid>@<hostname>', at least in SUN / Oracle JVMs
final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
final int index = jvmName.indexOf('@');
if (index < 1) {
// part before '@' empty (index = 0) / '@' not found (index = -1)
return fallback;
try {
return Long.parseLong(jvmName.substring(0, index));
catch (final NumberFormatException e) {
// ignore
return fallback;
* Default init and expand buffer size, it can be set by -Djraft.byte_buf.size=n, default 1024.
public static final int RAFT_DATA_BUF_SIZE = SystemPropertyUtil.getInt("jraft.byte_buf.size", 1024);
* Default max {@link ByteBufferCollector} size per thread for recycle, it can be set by
* -Djraft.max_collector_size_per_thread, default 256
public static final int MAX_COLLECTOR_SIZE_PER_THREAD = SystemPropertyUtil.getInt(
"jraft.max_collector_size_per_thread", 256);
* Expand byte buffer for 1024 bytes.
public static ByteBuffer expandByteBuffer(final ByteBuffer buf) {
return expandByteBufferAtLeast(buf, RAFT_DATA_BUF_SIZE);
* Allocate a byte buffer with size.
public static ByteBuffer allocate(final int size) {
return ByteBuffer.allocate(size);
* Allocate a byte buffer with {@link #RAFT_DATA_BUF_SIZE}
public static ByteBuffer allocate() {
return allocate(RAFT_DATA_BUF_SIZE);
* Expand byte buffer at least minLength.
public static ByteBuffer expandByteBufferAtLeast(final ByteBuffer buf, final int minLength) {
final int newCapacity = minLength > RAFT_DATA_BUF_SIZE ? minLength : RAFT_DATA_BUF_SIZE;
final ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() + newCapacity);
return newBuf;
* Expand byte buffer at most maxLength.
public static ByteBuffer expandByteBufferAtMost(final ByteBuffer buf, final int maxLength) {
final int newCapacity = maxLength > RAFT_DATA_BUF_SIZE || maxLength <= 0 ? RAFT_DATA_BUF_SIZE : maxLength;
final ByteBuffer newBuf = ByteBuffer.allocate(buf.capacity() + newCapacity);
return newBuf;
* ANY IP address
public static final String IP_ANY = "";
* Gets the current monotonic time in milliseconds.
public static long monotonicMs() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
* Returns the current time in milliseconds, it's not monotonic, would be forwarded/backward by clock synchronous.
public static long nowMs() {
return System.currentTimeMillis();
* Gets the current monotonic time in microseconds.
public static long monotonicUs() {
return TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
* Get string bytes in UTF-8 charset.
public static byte[] getBytes(final String s) {
return s.getBytes(StandardCharsets.UTF_8);
public static <T> T withLockObject(final T obj) {
return Requires.requireNonNull(obj, "obj");
public static boolean atomicMoveFile(final File source, final File target, final boolean sync) throws IOException {
// Move temp file to target path atomically.
// The code comes from
Requires.requireNonNull(source, "source");
Requires.requireNonNull(target, "target");
final Path sourcePath = source.toPath();
final Path targetPath = target.toPath();
boolean success;
try {
success = Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE) != null;
catch (final IOException e) {
// If it falls here that can mean many things. Either that the atomic move is not supported,
// or something wrong happened. Anyway, let's try to be over-diagnosing
if (e instanceof AtomicMoveNotSupportedException) {
LOG.warn("Atomic move not supported. falling back to non-atomic move, error: {}.", e.getMessage());
else {
LOG.warn("Unable to move atomically, falling back to non-atomic move, error: {}.", e.getMessage());
if (target.exists()) {"The target file {} was already existing.", targetPath);
try {
success = Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING) != null;
catch (final IOException e1) {
LOG.warn("Unable to move {} to {}. Attempting to delete {} and abandoning.", sourcePath, targetPath,
try {
catch (final IOException e2) {
LOG.warn("Unable to delete {}, good bye then!", sourcePath);
throw e2;
throw e1;
if (success && sync) {
File dir = target.getParentFile();
// fsync on target parent dir.
return success;
* Calls fsync on a file or directory.
* @param file file or directory
* @throws IOException if an I/O error occurs
public static void fsync(final File file) throws IOException {
final boolean isDir = file.isDirectory();
// can't fsync on windowns.
if (isDir && Platform.isWindows()) {
// LOG.warn("Unable to fsync directory {} on windows.", file);
try (final FileChannel fc =, isDir ? StandardOpenOption.READ
: StandardOpenOption.WRITE)) {
public static String getString(final byte[] bs, final int off, final int len) {
return new String(bs, off, len, StandardCharsets.UTF_8);
public static final String IPV6_START_MARK = "[";
public static final String IPV6_END_MARK = "]";
private static final int IPV6_ADDRESS_LENGTH = 16;
* check whether the ip address is IPv6.
* @param addr ip address
* @return boolean
public static boolean isIPv6(String addr) {
try {
return InetAddress.getByName(addr).getAddress().length == IPV6_ADDRESS_LENGTH;
catch (Exception e) {
throw new IllegalArgumentException(e);
* Parse peerId from string that generated by {@link #toString()} This method can support parameter string values
* are below:
* <pre>
* PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)
* PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)
* PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")
* PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
* </pre>
public static String[] parsePeerId(String s) {
if (s.startsWith(IPV6_START_MARK) && StringUtils.containsIgnoreCase(s, IPV6_END_MARK)) {
String ipv6Addr;
if (s.endsWith(IPV6_END_MARK)) {
ipv6Addr = s;
else {
ipv6Addr = s.substring(0, (s.indexOf(IPV6_END_MARK) + 1));
if (!isIPv6(ipv6Addr)) {
throw new IllegalArgumentException("The IPv6 address(\"" + ipv6Addr + "\") is incorrect.");
String tempString = s.substring((s.indexOf(ipv6Addr) + ipv6Addr.length()));
if (tempString.startsWith(":")) {
tempString = tempString.substring(1);
String[] tempArr = StringUtils.splitPreserveAllTokens(tempString, ':');
String[] result = new String[1 + tempArr.length];
result[0] = ipv6Addr;
System.arraycopy(tempArr, 0, result, 1, tempArr.length);
return result;
else {
return StringUtils.splitPreserveAllTokens(s, ':');
public static boolean mkdir(File file) {
if (file.exists() && file.isDirectory())
return true;
return file.mkdirs();
* Returns the size of a given collection or {@code 0} if the given collection is {@code null}.
* @param col collection to get the size of
* @return size of {@code col} or {@code 0} if {@code col} is {@code null}
public static int size(@Nullable Collection<?> col) {
return col == null ? 0 : col.size();