| /*- |
| * Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved. |
| * |
| * This file was distributed by Oracle as part of a version of Oracle Berkeley |
| * DB Java Edition made available at: |
| * |
| * http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html |
| * |
| * Please see the LICENSE file included in the top-level directory of the |
| * appropriate version of Oracle Berkeley DB Java Edition for a copy of the |
| * license and additional information. |
| */ |
| |
| package com.sleepycat.je.rep.elections; |
| |
| import java.io.BufferedReader; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.net.SocketException; |
| import java.net.SocketTimeoutException; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.logging.Formatter; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import com.sleepycat.je.EnvironmentFailureException; |
| import com.sleepycat.je.dbi.EnvironmentImpl; |
| import com.sleepycat.je.rep.impl.RepImpl; |
| import com.sleepycat.je.rep.impl.TextProtocol.MessageExchange; |
| import com.sleepycat.je.rep.impl.TextProtocol.RequestMessage; |
| import com.sleepycat.je.rep.impl.node.RepNode; |
| import com.sleepycat.je.rep.net.DataChannel; |
| import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; |
| import com.sleepycat.je.utilint.LoggerUtils; |
| |
| public class Utils { |
| |
| /** |
| * Cleans up the socket and its related streams after a request/response |
| * cycle. |
| * |
| * The DataChannel must be in blocking mode for this method. |
| * |
| * @param channel the channel to be closed |
| * @param in the request stream to be closed |
| * @param out the response stream to be closed |
| */ |
| static public void cleanup(Logger logger, |
| EnvironmentImpl envImpl, |
| Formatter formatter, |
| DataChannel channel, |
| BufferedReader in, |
| PrintWriter out) { |
| if (in != null) { |
| try { |
| in.close(); |
| } catch (IOException e) { |
| /* Ignore it, it's only cleanup. */ |
| } |
| } |
| if (out != null) { |
| out.close(); |
| } |
| if (channel != null) { |
| if (!channel.isBlocking()) { |
| throw new IllegalStateException( |
| "Unexpected non-blocking channel for clean up."); |
| } |
| try { |
| channel.close(); |
| } catch (IOException e) { |
| /* Log it and continue. */ |
| LoggerUtils.logMsg |
| (logger, envImpl, formatter, Level.FINE, |
| "Channel exception on close: " + e.getMessage()); |
| } |
| } |
| } |
| |
| /** |
| * @hidden |
| * Utility to broadcast a request to set of targets. |
| * |
| * @param targets of the broadcast |
| * @param requestMessage to be broadcast |
| * @param threadPool used to issue message in parallel |
| * |
| * @return the CompletionService representing the futures generated by the |
| * broadcast |
| */ |
| public static FutureTrackingCompService<MessageExchange> |
| broadcastMessage(Set<InetSocketAddress> targets, |
| String serviceName, |
| RequestMessage requestMessage, |
| ExecutorService threadPool) { |
| |
| final FutureTrackingCompService<MessageExchange> compService = |
| new FutureTrackingCompService<>(threadPool); |
| |
| for (InetSocketAddress socketAddress : targets) { |
| MessageExchange me = requestMessage.getProtocol() |
| .new MessageExchange(socketAddress, serviceName, requestMessage); |
| try { |
| compService.submit(me, me); |
| } catch (RejectedExecutionException ree) { |
| if (threadPool.isTerminated()) { |
| /* |
| * The thread pool has been shutdown asynchronously as |
| * part of a general elections shutdown. Discard submitted |
| * and running tasks. |
| */ |
| compService.cancelFutures(true); |
| return compService; |
| } |
| |
| /* |
| * Unexpected, rethrow so it can be reported at a higher |
| * level. |
| */ |
| throw ree; |
| } |
| } |
| return compService; |
| } |
| |
| /** |
| * Utility to wait for completion of futures in unit tests |
| * |
| * @param compService the futures to wait for |
| * @param logger used to report any error messages |
| */ |
| public static void checkFutures( |
| FutureTrackingCompService<MessageExchange> compService, |
| long futureTimeout, |
| TimeUnit unit, |
| Logger logger, |
| final RepImpl envImpl, |
| Formatter formatter) { |
| |
| new WithFutureExceptionHandler<MessageExchange> |
| (compService, futureTimeout, unit, logger, envImpl, formatter) { |
| |
| @Override |
| protected void processResponse (MessageExchange result) { |
| /* Do nothing, merely waiting for a response */ |
| } |
| |
| @Override |
| protected boolean isShutdown() { |
| return (envImpl != null) && !envImpl.isValid(); |
| } |
| }.execute(); |
| } |
| |
| /** |
| * @hidden |
| * |
| * A utility wrapper to handle all exceptions from futures in a consistent |
| * way. The above method illustrates its intended usage pattern |
| */ |
| public static abstract |
| class WithFutureExceptionHandler<T extends MessageExchange> { |
| |
| private final FutureTrackingCompService<T> completionService; |
| private final long completionTimeout; |
| private final TimeUnit unit; |
| |
| private final Logger logger; |
| private final RepImpl envImpl; |
| private final Formatter formatter; |
| |
| /** |
| * Generate a handler for processing future results. |
| * |
| * @param compService the CompletionService representing the futures |
| * |
| * @param completionTimeout the timeout indicating how long to wait for |
| * all the results. If running the tasks involves I/O, especially |
| * network I/O, the timeout should be sufficient to ensure that it |
| * allows for the associated latency. The timeout assumes that all the |
| * tasks are run in parallel, so it represents the max estimated task |
| * completion time associated with the tasks in the set. |
| * |
| * @param unit the units associated with the above timeout |
| */ |
| public WithFutureExceptionHandler( |
| FutureTrackingCompService<T> compService, |
| long completionTimeout, |
| TimeUnit unit, |
| Logger logger, |
| RepImpl envImpl, |
| Formatter formatter) { |
| |
| super(); |
| |
| this.completionService = compService; |
| this.completionTimeout = completionTimeout; |
| this.unit = unit; |
| this.logger = logger; |
| this.envImpl = envImpl; |
| this.formatter = formatter; |
| } |
| |
| /** |
| * The method represents the result processing code being wrapped upon |
| * a success response message being received. |
| * |
| * @param result the non null result, with a non null response message |
| */ |
| protected abstract void processResponse(T result); |
| |
| /** |
| * The counterpart to processResponse. It's invoked when there was no |
| * response to a message. The exception, if present, details the reason |
| * for the failure; some protocols may choose not to require a |
| * response, that is, both the response and result.exception could be |
| * null. The default method simply logs the event. |
| * |
| * Note that Timeouts don't come down this path; they result in tasks |
| * being cancelled with the timeout being logged. |
| * |
| * @param result the non null result, with a null response message |
| */ |
| void processNullResponse(T result) { |
| LoggerUtils.logMsg(logger, envImpl, |
| formatter, Level.FINE, |
| "No response from: " + result.target + |
| " request" + result.getRequestMessage() + |
| " reason: " + result.exception); |
| return; |
| } |
| |
| /** |
| * Determines whether the initiating operation was shutdown. |
| * |
| * @return true if the futures should not be processed but should be |
| * cancelled instead. |
| */ |
| protected abstract boolean isShutdown(); |
| |
| /** |
| * Processes futures in the order in which they complete, as |
| * determined by the completion service, to minimize unnecessary |
| * waiting. |
| */ |
| public final void execute() { |
| final long limitTimeMs = |
| System.currentTimeMillis() + unit.toMillis(completionTimeout); |
| try { |
| for (int count = completionService.getFutures().size(); |
| count > 0; count--) { |
| final long pollTimeoutMs = |
| limitTimeMs - System.currentTimeMillis(); |
| if (pollTimeoutMs <= 0) { |
| /* Timed out. */ |
| LoggerUtils. |
| logMsg(logger, envImpl, formatter, Level.INFO, |
| "Election messages timed out after " + |
| unit.toMillis(completionTimeout) + "ms."); |
| return; |
| } |
| |
| /* Wait for the next task that is ready. */ |
| final Future<T> f = |
| completionService.poll(pollTimeoutMs, |
| TimeUnit.MILLISECONDS); |
| |
| if (f == null) { |
| /* Timed out. */ |
| LoggerUtils. |
| logMsg(logger, envImpl, formatter, Level.INFO, |
| "Election messages timed out after " + |
| unit.toMillis(completionTimeout) + "ms."); |
| return; |
| } |
| |
| if (isShutdown()) { |
| LoggerUtils. |
| logMsg(logger, envImpl, formatter, Level.INFO, |
| "Election messages terminated." + |
| " Environment being shutdown." ); |
| /* Simply exit. */ |
| return; |
| } |
| |
| if (f.isCancelled()) { |
| continue; |
| } |
| |
| final long futureTimeoutMs = |
| limitTimeMs - System.currentTimeMillis(); |
| if (futureTimeoutMs <= 0) { |
| return; |
| } |
| |
| assert(f.isDone()); |
| executeInternal(f); |
| } |
| |
| } catch (InterruptedException e) { |
| |
| if (envImpl != null) { |
| final RepNode rn = envImpl.getRepNode(); |
| if ((rn != null) && rn.isShutdown()) { |
| |
| /* |
| * Interrupt for shutdown, it's likely part of a |
| * "hard" stoppable thread shutdown, ignore it. |
| */ |
| LoggerUtils. |
| logMsg(logger, envImpl, formatter, Level.INFO, |
| "Election operation interrupted." + |
| " Environment being shutdown." ); |
| /* Simply exit. */ |
| return; |
| } |
| } |
| |
| throw EnvironmentFailureException.unexpectedException(e); |
| } finally { |
| /* |
| * Clean up all tasks, in case we exited before they were all |
| * done. |
| */ |
| completionService.cancelFutures(true); |
| } |
| } |
| |
| /** |
| * The method represents the future process code being wrapped. It |
| * processes generic future-specific exceptions, where the impact of |
| * the fault can be localized to the future. |
| * |
| * @param future the "done" future |
| */ |
| private final void executeInternal(Future<T> future) |
| throws InterruptedException { |
| |
| try { |
| /* We don't expect to wait, since the future is "done". */ |
| assert future.isDone(); |
| T result = future.get(); |
| if (result.getResponseMessage() == null) { |
| processNullResponse(result); |
| } else { |
| processResponse(result); |
| } |
| } catch (CancellationException ce) { |
| /* Ignore. */ |
| } catch (ExecutionException e) { |
| /* Get the true cause, unwrap the intermediate wrappers */ |
| Exception cause = (Exception)e.getCause(); |
| while (cause instanceof RuntimeException) { |
| Throwable t = ((RuntimeException)cause).getCause(); |
| if ((t != null) && (t instanceof Exception)) { |
| cause = (Exception)t; |
| } else { |
| break; |
| } |
| } |
| if ((cause instanceof ConnectException) || |
| (cause instanceof SocketException) || |
| (cause instanceof SocketTimeoutException) || |
| (cause instanceof ServiceConnectFailedException)){ |
| // Network exceptions are expected, log it and keep moving |
| LoggerUtils.logMsg(logger, envImpl, formatter, Level.FINE, |
| "Election connection failure " + |
| cause.getMessage()); |
| return; |
| } |
| /* Unanticipated exception, higher level will handle it */ |
| throw EnvironmentFailureException.unexpectedException(e); |
| } |
| } |
| } |
| |
| /** |
| * @hidden |
| * |
| * A subclass of ExecutorCompletionService that tracks submitted tasks, so |
| * that futures associated with tasks can be cancelled in a modular way, |
| * without the need for maintaining distinct state about the futures that |
| * were created. |
| */ |
| public static class FutureTrackingCompService<V> extends |
| ExecutorCompletionService<V> { |
| |
| /* The list of futures resulting from submitted tasks. */ |
| private final List<Future<V>> futures = new LinkedList<>(); |
| |
| public FutureTrackingCompService(Executor executor) { |
| super(executor); |
| } |
| |
| public List<Future<V>> getFutures() { |
| return futures; |
| } |
| |
| /** |
| * Wrapper around submit() method to track futures resulting from |
| * submitted tasks. |
| */ |
| @Override |
| public Future<V> submit(Runnable task, V result) { |
| final Future<V> f = super.submit(task, result); |
| futures.add(f); |
| return f; |
| } |
| |
| /** |
| * Wrapper around submit() method to track futures resulting from |
| * submitted tasks. |
| */ |
| @Override |
| public Future<V> submit(Callable<V> task) { |
| final Future<V> f = super.submit(task); |
| futures.add(f); |
| return f; |
| } |
| |
| public void cancelFutures(boolean mayInterruptIfRunning) { |
| for (Future<V> f : futures) { |
| if (!f.isCancelled()) { |
| f.cancel(mayInterruptIfRunning); |
| } |
| } |
| } |
| } |
| } |