blob: bd50a87a370b1891aa249d17447d61ef319db3c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
/**
* IO related utility methods.
*/
public interface IOUtils {
static InterruptedIOException toInterruptedIOException(
String message, InterruptedException e) {
final InterruptedIOException iioe = new InterruptedIOException(message);
iioe.initCause(e);
return iioe;
}
static IOException asIOException(Throwable t) {
Objects.requireNonNull(t, "t == null");
return t instanceof IOException? (IOException)t : new IOException(t);
}
static IOException toIOException(ExecutionException e) {
final Throwable cause = e.getCause();
return cause != null? asIOException(cause): new IOException(e);
}
static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> name) throws IOException {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw toInterruptedIOException(name.get() + " interrupted.", e);
} catch (ExecutionException e) {
throw toIOException(e);
} catch (CompletionException e) {
throw asIOException(JavaUtils.unwrapCompletionException(e));
}
}
static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> name, TimeDuration timeout)
throws IOException {
try {
return future.get(timeout.getDuration(), timeout.getUnit());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw toInterruptedIOException(name.get() + " interrupted.", e);
} catch (ExecutionException e) {
throw toIOException(e);
} catch (CompletionException e) {
throw asIOException(JavaUtils.unwrapCompletionException(e));
} catch(TimeoutException e) {
throw new TimeoutIOException("Timeout " + timeout + ": " + name.get(), e);
}
}
static boolean shouldReconnect(Throwable e) {
return ReflectionUtils.isInstance(e,
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
AlreadyClosedException.class);
}
static void readFully(InputStream in, int buffSize) throws IOException {
final byte [] buf = new byte[buffSize];
for(int bytesRead = in.read(buf); bytesRead >= 0; ) {
bytesRead = in.read(buf);
}
}
/**
* Reads len bytes in a loop.
*
* @param in InputStream to read from
* @param buf The buffer to fill
* @param off offset from the buffer
* @param len the length of bytes to read
* @throws IOException if it could not read requested number of bytes
* for any reason (including EOF)
*/
static void readFully(InputStream in, byte[] buf, int off, int len)
throws IOException {
for(int toRead = len; toRead > 0; ) {
final int ret = in.read(buf, off, toRead);
if (ret < 0) {
final int read = len - toRead;
throw new EOFException("Premature EOF: read length is " + len + " but encountered EOF at " + read);
}
toRead -= ret;
off += ret;
}
}
/**
* Write a ByteBuffer to a FileChannel at a given offset,
* handling short writes.
*
* @param fc The FileChannel to write to
* @param buf The input buffer
* @param offset The offset in the file to start writing at
* @throws IOException On I/O error
*/
static void writeFully(FileChannel fc, ByteBuffer buf, long offset)
throws IOException {
do {
offset += fc.write(buf, offset);
} while (buf.remaining() > 0);
}
static long preallocate(FileChannel fc, long size, ByteBuffer fill) throws IOException {
Preconditions.assertSame(0, fill.position(), "fill.position");
Preconditions.assertSame(fill.capacity(), fill.limit(), "fill.limit");
final int remaining = fill.remaining();
long allocated = 0;
for(; allocated < size; ) {
final long required = size - allocated;
final int n = remaining < required? remaining: Math.toIntExact(required);
final ByteBuffer buffer = fill.slice();
buffer.limit(n);
IOUtils.writeFully(fc, buffer, fc.size());
allocated += n;
}
return allocated;
}
/**
* Similar to readFully(). Skips bytes in a loop.
* @param in The InputStream to skip bytes from
* @param len number of bytes to skip.
* @throws IOException if it could not skip requested number of bytes
* for any reason (including EOF)
*/
static void skipFully(InputStream in, long len) throws IOException {
long amt = len;
while (amt > 0) {
long ret = in.skip(amt);
if (ret == 0) {
// skip may return 0 even if we're not at EOF. Luckily, we can
// use the read() method to figure out if we're at the end.
int b = in.read();
if (b == -1) {
throw new EOFException( "Premature EOF from inputStream after " +
"skipping " + (len - amt) + " byte(s).");
}
ret = 1;
}
amt -= ret;
}
}
/**
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log the log to record problems to at debug level. Can be null.
* @param closeables the objects to close
*/
static void cleanup(Logger log, Closeable... closeables) {
for (Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch(Exception e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
}
}
}
}
/** Serialize the given object to a byte array using {@link java.io.ObjectOutputStream#writeObject(Object)}. */
static byte[] object2Bytes(Object obj) {
return ProtoUtils.writeObject2ByteString(obj).toByteArray();
}
static <T> T bytes2Object(byte[] bytes, Class<T> clazz) {
return readObject(new ByteArrayInputStream(bytes), clazz);
}
static <T> T readObject(InputStream in, Class<T> clazz) {
Object obj = null;
try(ObjectInputStream oin = new ObjectInputStream(in)) {
obj = oin.readObject();
return clazz.cast(obj);
} catch (IOException | ClassNotFoundException e) {
throw new IllegalStateException("Failed to cast to " + clazz + ", object="
+ (obj instanceof Throwable? StringUtils.stringifyException((Throwable) obj): obj), e);
}
}
}