blob: c2a46e9c265e5ae18d3bc81237e7a77274e61780 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je.rep.elections;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.EnvironmentFailureException;
import com.sleepycat.je.dbi.EnvironmentImpl;
import com.sleepycat.je.rep.impl.RepImpl;
import com.sleepycat.je.rep.impl.TextProtocol.MessageExchange;
import com.sleepycat.je.rep.impl.TextProtocol.RequestMessage;
import com.sleepycat.je.rep.impl.node.RepNode;
import com.sleepycat.je.rep.net.DataChannel;
import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
import com.sleepycat.je.utilint.LoggerUtils;
public class Utils {
/**
* Cleans up the socket and its related streams after a request/response
* cycle.
*
* The DataChannel must be in blocking mode for this method.
*
* @param channel the channel to be closed
* @param in the request stream to be closed
* @param out the response stream to be closed
*/
static public void cleanup(Logger logger,
EnvironmentImpl envImpl,
Formatter formatter,
DataChannel channel,
BufferedReader in,
PrintWriter out) {
if (in != null) {
try {
in.close();
} catch (IOException e) {
/* Ignore it, it's only cleanup. */
}
}
if (out != null) {
out.close();
}
if (channel != null) {
if (!channel.isBlocking()) {
throw new IllegalStateException(
"Unexpected non-blocking channel for clean up.");
}
try {
channel.close();
} catch (IOException e) {
/* Log it and continue. */
LoggerUtils.logMsg
(logger, envImpl, formatter, Level.FINE,
"Channel exception on close: " + e.getMessage());
}
}
}
/**
* @hidden
* Utility to broadcast a request to set of targets.
*
* @param targets of the broadcast
* @param requestMessage to be broadcast
* @param threadPool used to issue message in parallel
*
* @return the CompletionService representing the futures generated by the
* broadcast
*/
public static FutureTrackingCompService<MessageExchange>
broadcastMessage(Set<InetSocketAddress> targets,
String serviceName,
RequestMessage requestMessage,
ExecutorService threadPool) {
final FutureTrackingCompService<MessageExchange> compService =
new FutureTrackingCompService<>(threadPool);
for (InetSocketAddress socketAddress : targets) {
MessageExchange me = requestMessage.getProtocol()
.new MessageExchange(socketAddress, serviceName, requestMessage);
try {
compService.submit(me, me);
} catch (RejectedExecutionException ree) {
if (threadPool.isTerminated()) {
/*
* The thread pool has been shutdown asynchronously as
* part of a general elections shutdown. Discard submitted
* and running tasks.
*/
compService.cancelFutures(true);
return compService;
}
/*
* Unexpected, rethrow so it can be reported at a higher
* level.
*/
throw ree;
}
}
return compService;
}
/**
* Utility to wait for completion of futures in unit tests
*
* @param compService the futures to wait for
* @param logger used to report any error messages
*/
public static void checkFutures(
FutureTrackingCompService<MessageExchange> compService,
long futureTimeout,
TimeUnit unit,
Logger logger,
final RepImpl envImpl,
Formatter formatter) {
new WithFutureExceptionHandler<MessageExchange>
(compService, futureTimeout, unit, logger, envImpl, formatter) {
@Override
protected void processResponse (MessageExchange result) {
/* Do nothing, merely waiting for a response */
}
@Override
protected boolean isShutdown() {
return (envImpl != null) && !envImpl.isValid();
}
}.execute();
}
/**
* @hidden
*
* A utility wrapper to handle all exceptions from futures in a consistent
* way. The above method illustrates its intended usage pattern
*/
public static abstract
class WithFutureExceptionHandler<T extends MessageExchange> {
private final FutureTrackingCompService<T> completionService;
private final long completionTimeout;
private final TimeUnit unit;
private final Logger logger;
private final RepImpl envImpl;
private final Formatter formatter;
/**
* Generate a handler for processing future results.
*
* @param compService the CompletionService representing the futures
*
* @param completionTimeout the timeout indicating how long to wait for
* all the results. If running the tasks involves I/O, especially
* network I/O, the timeout should be sufficient to ensure that it
* allows for the associated latency. The timeout assumes that all the
* tasks are run in parallel, so it represents the max estimated task
* completion time associated with the tasks in the set.
*
* @param unit the units associated with the above timeout
*/
public WithFutureExceptionHandler(
FutureTrackingCompService<T> compService,
long completionTimeout,
TimeUnit unit,
Logger logger,
RepImpl envImpl,
Formatter formatter) {
super();
this.completionService = compService;
this.completionTimeout = completionTimeout;
this.unit = unit;
this.logger = logger;
this.envImpl = envImpl;
this.formatter = formatter;
}
/**
* The method represents the result processing code being wrapped upon
* a success response message being received.
*
* @param result the non null result, with a non null response message
*/
protected abstract void processResponse(T result);
/**
* The counterpart to processResponse. It's invoked when there was no
* response to a message. The exception, if present, details the reason
* for the failure; some protocols may choose not to require a
* response, that is, both the response and result.exception could be
* null. The default method simply logs the event.
*
* Note that Timeouts don't come down this path; they result in tasks
* being cancelled with the timeout being logged.
*
* @param result the non null result, with a null response message
*/
void processNullResponse(T result) {
LoggerUtils.logMsg(logger, envImpl,
formatter, Level.FINE,
"No response from: " + result.target +
" request" + result.getRequestMessage() +
" reason: " + result.exception);
return;
}
/**
* Determines whether the initiating operation was shutdown.
*
* @return true if the futures should not be processed but should be
* cancelled instead.
*/
protected abstract boolean isShutdown();
/**
* Processes futures in the order in which they complete, as
* determined by the completion service, to minimize unnecessary
* waiting.
*/
public final void execute() {
final long limitTimeMs =
System.currentTimeMillis() + unit.toMillis(completionTimeout);
try {
for (int count = completionService.getFutures().size();
count > 0; count--) {
final long pollTimeoutMs =
limitTimeMs - System.currentTimeMillis();
if (pollTimeoutMs <= 0) {
/* Timed out. */
LoggerUtils.
logMsg(logger, envImpl, formatter, Level.INFO,
"Election messages timed out after " +
unit.toMillis(completionTimeout) + "ms.");
return;
}
/* Wait for the next task that is ready. */
final Future<T> f =
completionService.poll(pollTimeoutMs,
TimeUnit.MILLISECONDS);
if (f == null) {
/* Timed out. */
LoggerUtils.
logMsg(logger, envImpl, formatter, Level.INFO,
"Election messages timed out after " +
unit.toMillis(completionTimeout) + "ms.");
return;
}
if (isShutdown()) {
LoggerUtils.
logMsg(logger, envImpl, formatter, Level.INFO,
"Election messages terminated." +
" Environment being shutdown." );
/* Simply exit. */
return;
}
if (f.isCancelled()) {
continue;
}
final long futureTimeoutMs =
limitTimeMs - System.currentTimeMillis();
if (futureTimeoutMs <= 0) {
return;
}
assert(f.isDone());
executeInternal(f);
}
} catch (InterruptedException e) {
if (envImpl != null) {
final RepNode rn = envImpl.getRepNode();
if ((rn != null) && rn.isShutdown()) {
/*
* Interrupt for shutdown, it's likely part of a
* "hard" stoppable thread shutdown, ignore it.
*/
LoggerUtils.
logMsg(logger, envImpl, formatter, Level.INFO,
"Election operation interrupted." +
" Environment being shutdown." );
/* Simply exit. */
return;
}
}
throw EnvironmentFailureException.unexpectedException(e);
} finally {
/*
* Clean up all tasks, in case we exited before they were all
* done.
*/
completionService.cancelFutures(true);
}
}
/**
* The method represents the future process code being wrapped. It
* processes generic future-specific exceptions, where the impact of
* the fault can be localized to the future.
*
* @param future the "done" future
*/
private final void executeInternal(Future<T> future)
throws InterruptedException {
try {
/* We don't expect to wait, since the future is "done". */
assert future.isDone();
T result = future.get();
if (result.getResponseMessage() == null) {
processNullResponse(result);
} else {
processResponse(result);
}
} catch (CancellationException ce) {
/* Ignore. */
} catch (ExecutionException e) {
/* Get the true cause, unwrap the intermediate wrappers */
Exception cause = (Exception)e.getCause();
while (cause instanceof RuntimeException) {
Throwable t = ((RuntimeException)cause).getCause();
if ((t != null) && (t instanceof Exception)) {
cause = (Exception)t;
} else {
break;
}
}
if ((cause instanceof ConnectException) ||
(cause instanceof SocketException) ||
(cause instanceof SocketTimeoutException) ||
(cause instanceof ServiceConnectFailedException)){
// Network exceptions are expected, log it and keep moving
LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE,
"Election connection failure " +
cause.getMessage());
return;
}
/* Unanticipated exception, higher level will handle it */
throw EnvironmentFailureException.unexpectedException(e);
}
}
}
/**
* @hidden
*
* A subclass of ExecutorCompletionService that tracks submitted tasks, so
* that futures associated with tasks can be cancelled in a modular way,
* without the need for maintaining distinct state about the futures that
* were created.
*/
public static class FutureTrackingCompService<V> extends
ExecutorCompletionService<V> {
/* The list of futures resulting from submitted tasks. */
private final List<Future<V>> futures = new LinkedList<>();
public FutureTrackingCompService(Executor executor) {
super(executor);
}
public List<Future<V>> getFutures() {
return futures;
}
/**
* Wrapper around submit() method to track futures resulting from
* submitted tasks.
*/
@Override
public Future<V> submit(Runnable task, V result) {
final Future<V> f = super.submit(task, result);
futures.add(f);
return f;
}
/**
* Wrapper around submit() method to track futures resulting from
* submitted tasks.
*/
@Override
public Future<V> submit(Callable<V> task) {
final Future<V> f = super.submit(task);
futures.add(f);
return f;
}
public void cancelFutures(boolean mayInterruptIfRunning) {
for (Future<V> f : futures) {
if (!f.isCancelled()) {
f.cancel(mayInterruptIfRunning);
}
}
}
}
}