| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.auditor; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.SettableFuture; |
| import com.twitter.distributedlog.BookKeeperClient; |
| import com.twitter.distributedlog.BookKeeperClientBuilder; |
| import com.twitter.distributedlog.DistributedLogConfiguration; |
| import com.twitter.distributedlog.DistributedLogManager; |
| import com.twitter.distributedlog.LogSegmentMetadata; |
| import com.twitter.distributedlog.impl.BKNamespaceDriver; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespace; |
| import com.twitter.distributedlog.ZooKeeperClient; |
| import com.twitter.distributedlog.ZooKeeperClientBuilder; |
| import com.twitter.distributedlog.exceptions.DLInterruptedException; |
| import com.twitter.distributedlog.exceptions.ZKException; |
| import com.twitter.distributedlog.impl.metadata.BKDLConfig; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; |
| import com.twitter.distributedlog.namespace.NamespaceDriver; |
| import com.twitter.distributedlog.util.DLUtils; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.client.BookKeeperAccessor; |
| import org.apache.bookkeeper.client.LedgerHandle; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; |
| import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; |
| import org.apache.bookkeeper.zookeeper.RetryPolicy; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| |
| /** |
| * DL Auditor will audit DL namespace, e.g. find leaked ledger, report disk usage by streams. |
| */ |
| public class DLAuditor { |
| |
| private static final Logger logger = LoggerFactory.getLogger(DLAuditor.class); |
| |
| private final DistributedLogConfiguration conf; |
| |
| public DLAuditor(DistributedLogConfiguration conf) { |
| this.conf = conf; |
| } |
| |
| private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) { |
| NamespaceDriver driver = namespace.getNamespaceDriver(); |
| assert(driver instanceof BKNamespaceDriver); |
| return ((BKNamespaceDriver) driver).getWriterZKC(); |
| } |
| |
| private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) { |
| NamespaceDriver driver = namespace.getNamespaceDriver(); |
| assert(driver instanceof BKNamespaceDriver); |
| return ((BKNamespaceDriver) driver).getReaderBKC(); |
| } |
| |
| private String validateAndGetZKServers(List<URI> uris) { |
| URI firstURI = uris.get(0); |
| String zkServers = BKNamespaceDriver.getZKServersFromDLUri(firstURI); |
| for (URI uri : uris) { |
| if (!zkServers.equalsIgnoreCase(BKNamespaceDriver.getZKServersFromDLUri(uri))) { |
| throw new IllegalArgumentException("Uris don't belong to same zookeeper cluster"); |
| } |
| } |
| return zkServers; |
| } |
| |
| private BKDLConfig resolveBKDLConfig(ZooKeeperClient zkc, List<URI> uris) throws IOException { |
| URI firstURI = uris.get(0); |
| BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, firstURI); |
| for (URI uri : uris) { |
| BKDLConfig anotherConfig = BKDLConfig.resolveDLConfig(zkc, uri); |
| if (!(Objects.equal(bkdlConfig.getBkLedgersPath(), anotherConfig.getBkLedgersPath()) |
| && Objects.equal(bkdlConfig.getBkZkServersForWriter(), anotherConfig.getBkZkServersForWriter()))) { |
| throw new IllegalArgumentException("Uris don't use same bookkeeper cluster"); |
| } |
| } |
| return bkdlConfig; |
| } |
| |
| public Pair<Set<Long>, Set<Long>> collectLedgers(List<URI> uris, List<List<String>> allocationPaths) |
| throws IOException { |
| Preconditions.checkArgument(uris.size() > 0, "No uri provided to audit"); |
| |
| String zkServers = validateAndGetZKServers(uris); |
| RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( |
| conf.getZKRetryBackoffStartMillis(), |
| conf.getZKRetryBackoffMaxMillis(), |
| Integer.MAX_VALUE); |
| ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() |
| .name("DLAuditor-ZK") |
| .zkServers(zkServers) |
| .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) |
| .retryPolicy(retryPolicy) |
| .zkAclId(conf.getZkAclId()) |
| .build(); |
| ExecutorService executorService = Executors.newCachedThreadPool(); |
| try { |
| BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris); |
| logger.info("Resolved bookkeeper config : {}", bkdlConfig); |
| |
| BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder() |
| .name("DLAuditor-BK") |
| .dlConfig(conf) |
| .zkServers(bkdlConfig.getBkZkServersForWriter()) |
| .ledgersPath(bkdlConfig.getBkLedgersPath()) |
| .build(); |
| try { |
| Set<Long> bkLedgers = collectLedgersFromBK(bkc, executorService); |
| Set<Long> dlLedgers = collectLedgersFromDL(uris, allocationPaths); |
| return Pair.of(bkLedgers, dlLedgers); |
| } finally { |
| bkc.close(); |
| } |
| } finally { |
| zkc.close(); |
| executorService.shutdown(); |
| } |
| } |
| |
| /** |
| * Find leak ledgers phase 1: collect ledgers set. |
| */ |
| private Set<Long> collectLedgersFromBK(BookKeeperClient bkc, |
| final ExecutorService executorService) |
| throws IOException { |
| LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); |
| |
| final Set<Long> ledgers = new HashSet<Long>(); |
| final SettableFuture<Void> doneFuture = SettableFuture.create(); |
| |
| BookkeeperInternalCallbacks.Processor<Long> collector = |
| new BookkeeperInternalCallbacks.Processor<Long>() { |
| @Override |
| public void process(Long lid, |
| final AsyncCallback.VoidCallback cb) { |
| synchronized (ledgers) { |
| ledgers.add(lid); |
| if (0 == ledgers.size() % 1000) { |
| logger.info("Collected {} ledgers", ledgers.size()); |
| } |
| } |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| cb.processResult(BKException.Code.OK, null, null); |
| } |
| }); |
| |
| } |
| }; |
| AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() { |
| @Override |
| public void processResult(int rc, String path, Object ctx) { |
| if (BKException.Code.OK == rc) { |
| doneFuture.set(null); |
| } else { |
| doneFuture.setException(BKException.create(rc)); |
| } |
| } |
| }; |
| lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, |
| BKException.Code.ZKException); |
| try { |
| doneFuture.get(); |
| logger.info("Collected total {} ledgers", ledgers.size()); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new DLInterruptedException("Interrupted on collecting ledgers : ", e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException)(e.getCause()); |
| } else { |
| throw new IOException("Failed to collect ledgers : ", e.getCause()); |
| } |
| } |
| return ledgers; |
| } |
| |
| /** |
| * Find leak ledgers phase 2: collect ledgers from uris. |
| */ |
| private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths) |
| throws IOException { |
| final Set<Long> ledgers = new TreeSet<Long>(); |
| List<DistributedLogNamespace> namespaces = |
| new ArrayList<DistributedLogNamespace>(uris.size()); |
| try { |
| for (URI uri : uris) { |
| namespaces.add( |
| DistributedLogNamespaceBuilder.newBuilder() |
| .conf(conf) |
| .uri(uri) |
| .build()); |
| } |
| final CountDownLatch doneLatch = new CountDownLatch(uris.size()); |
| final AtomicInteger numFailures = new AtomicInteger(0); |
| ExecutorService executor = Executors.newFixedThreadPool(uris.size()); |
| try { |
| int i = 0; |
| for (final DistributedLogNamespace namespace : namespaces) { |
| final DistributedLogNamespace dlNamespace = namespace; |
| final URI uri = uris.get(i); |
| final List<String> aps = allocationPaths.get(i); |
| i++; |
| executor.submit(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| logger.info("Collecting ledgers from {} : {}", uri, aps); |
| collectLedgersFromAllocator(uri, namespace, aps, ledgers); |
| synchronized (ledgers) { |
| logger.info("Collected {} ledgers from allocators for {} : {} ", |
| new Object[]{ledgers.size(), uri, ledgers}); |
| } |
| collectLedgersFromDL(uri, namespace, ledgers); |
| } catch (IOException e) { |
| numFailures.incrementAndGet(); |
| logger.info("Error to collect ledgers from DL : ", e); |
| } |
| doneLatch.countDown(); |
| } |
| }); |
| } |
| try { |
| doneLatch.await(); |
| if (numFailures.get() > 0) { |
| throw new IOException(numFailures.get() + " errors to collect ledgers from DL"); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| logger.warn("Interrupted on collecting ledgers from DL : ", e); |
| throw new DLInterruptedException("Interrupted on collecting ledgers from DL : ", e); |
| } |
| } finally { |
| executor.shutdown(); |
| } |
| } finally { |
| for (DistributedLogNamespace namespace : namespaces) { |
| namespace.close(); |
| } |
| } |
| return ledgers; |
| } |
| |
| private void collectLedgersFromAllocator(final URI uri, |
| final DistributedLogNamespace namespace, |
| final List<String> allocationPaths, |
| final Set<Long> ledgers) throws IOException { |
| final LinkedBlockingQueue<String> poolQueue = |
| new LinkedBlockingQueue<String>(); |
| for (String allocationPath : allocationPaths) { |
| String rootPath = uri.getPath() + "/" + allocationPath; |
| try { |
| List<String> pools = getZooKeeperClient(namespace).get().getChildren(rootPath, false); |
| for (String pool : pools) { |
| poolQueue.add(rootPath + "/" + pool); |
| } |
| } catch (KeeperException e) { |
| throw new ZKException("Failed to get list of pools from " + rootPath, e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new DLInterruptedException("Interrupted on getting list of pools from " + rootPath, e); |
| } |
| } |
| |
| |
| logger.info("Collecting ledgers from allocators for {} : {}", uri, poolQueue); |
| |
| executeAction(poolQueue, 10, new Action<String>() { |
| @Override |
| public void execute(String poolPath) throws IOException { |
| try { |
| collectLedgersFromPool(poolPath); |
| } catch (InterruptedException e) { |
| throw new DLInterruptedException("Interrupted on collecting ledgers from allocation pool " + poolPath, e); |
| } catch (KeeperException e) { |
| throw new ZKException("Failed to collect ledgers from allocation pool " + poolPath, e.code()); |
| } |
| } |
| |
| private void collectLedgersFromPool(String poolPath) |
| throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException { |
| List<String> allocators = getZooKeeperClient(namespace).get() |
| .getChildren(poolPath, false); |
| for (String allocator : allocators) { |
| String allocatorPath = poolPath + "/" + allocator; |
| byte[] data = getZooKeeperClient(namespace).get().getData(allocatorPath, false, new Stat()); |
| if (null != data && data.length > 0) { |
| try { |
| long ledgerId = DLUtils.bytes2LogSegmentId(data); |
| synchronized (ledgers) { |
| ledgers.add(ledgerId); |
| } |
| } catch (NumberFormatException nfe) { |
| logger.warn("Invalid ledger found in allocator path {} : ", allocatorPath, nfe); |
| } |
| } |
| } |
| } |
| }); |
| |
| logger.info("Collected ledgers from allocators for {}.", uri); |
| } |
| |
| private void collectLedgersFromDL(final URI uri, |
| final DistributedLogNamespace namespace, |
| final Set<Long> ledgers) throws IOException { |
| logger.info("Enumerating {} to collect streams.", uri); |
| Iterator<String> streams = namespace.getLogs(); |
| final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); |
| while (streams.hasNext()) { |
| streamQueue.add(streams.next()); |
| } |
| |
| logger.info("Collected {} streams from uri {} : {}", |
| new Object[] { streamQueue.size(), uri, streams }); |
| |
| executeAction(streamQueue, 10, new Action<String>() { |
| @Override |
| public void execute(String stream) throws IOException { |
| collectLedgersFromStream(namespace, stream, ledgers); |
| } |
| }); |
| } |
| |
| private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace, |
| String stream, |
| Set<Long> ledgers) |
| throws IOException { |
| DistributedLogManager dlm = namespace.openLog(stream); |
| try { |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| List<Long> sLedgers = new ArrayList<Long>(); |
| for (LogSegmentMetadata segment : segments) { |
| synchronized (ledgers) { |
| ledgers.add(segment.getLogSegmentId()); |
| } |
| sLedgers.add(segment.getLogSegmentId()); |
| } |
| return sLedgers; |
| } finally { |
| dlm.close(); |
| } |
| } |
| |
| /** |
| * Calculating stream space usage from given <i>uri</i>. |
| * |
| * @param uri dl uri |
| * @throws IOException |
| */ |
| public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException { |
| logger.info("Collecting stream space usage for {}.", uri); |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(conf) |
| .uri(uri) |
| .build(); |
| try { |
| return calculateStreamSpaceUsage(uri, namespace); |
| } finally { |
| namespace.close(); |
| } |
| } |
| |
| private Map<String, Long> calculateStreamSpaceUsage( |
| final URI uri, final DistributedLogNamespace namespace) |
| throws IOException { |
| Iterator<String> streams = namespace.getLogs(); |
| final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); |
| while (streams.hasNext()) { |
| streamQueue.add(streams.next()); |
| } |
| |
| final Map<String, Long> streamSpaceUsageMap = |
| new ConcurrentSkipListMap<String, Long>(); |
| final AtomicInteger numStreamsCollected = new AtomicInteger(0); |
| |
| executeAction(streamQueue, 10, new Action<String>() { |
| @Override |
| public void execute(String stream) throws IOException { |
| streamSpaceUsageMap.put(stream, |
| calculateStreamSpaceUsage(namespace, stream)); |
| if (numStreamsCollected.incrementAndGet() % 1000 == 0) { |
| logger.info("Calculated {} streams from uri {}.", numStreamsCollected.get(), uri); |
| } |
| } |
| }); |
| |
| return streamSpaceUsageMap; |
| } |
| |
| private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace, |
| final String stream) throws IOException { |
| DistributedLogManager dlm = namespace.openLog(stream); |
| long totalBytes = 0; |
| try { |
| List<LogSegmentMetadata> segments = dlm.getLogSegments(); |
| for (LogSegmentMetadata segment : segments) { |
| try { |
| LedgerHandle lh = getBookKeeperClient(namespace).get().openLedgerNoRecovery(segment.getLogSegmentId(), |
| BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); |
| totalBytes += lh.getLength(); |
| lh.close(); |
| } catch (BKException e) { |
| logger.error("Failed to open ledger {} : ", segment.getLogSegmentId(), e); |
| throw new IOException("Failed to open ledger " + segment.getLogSegmentId(), e); |
| } catch (InterruptedException e) { |
| logger.warn("Interrupted on opening ledger {} : ", segment.getLogSegmentId(), e); |
| Thread.currentThread().interrupt(); |
| throw new DLInterruptedException("Interrupted on opening ledger " + segment.getLogSegmentId(), e); |
| } |
| } |
| } finally { |
| dlm.close(); |
| } |
| return totalBytes; |
| } |
| |
| public long calculateLedgerSpaceUsage(URI uri) throws IOException { |
| List<URI> uris = Lists.newArrayList(uri); |
| String zkServers = validateAndGetZKServers(uris); |
| RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy( |
| conf.getZKRetryBackoffStartMillis(), |
| conf.getZKRetryBackoffMaxMillis(), |
| Integer.MAX_VALUE); |
| ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() |
| .name("DLAuditor-ZK") |
| .zkServers(zkServers) |
| .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) |
| .retryPolicy(retryPolicy) |
| .zkAclId(conf.getZkAclId()) |
| .build(); |
| ExecutorService executorService = Executors.newCachedThreadPool(); |
| try { |
| BKDLConfig bkdlConfig = resolveBKDLConfig(zkc, uris); |
| logger.info("Resolved bookkeeper config : {}", bkdlConfig); |
| |
| BookKeeperClient bkc = BookKeeperClientBuilder.newBuilder() |
| .name("DLAuditor-BK") |
| .dlConfig(conf) |
| .zkServers(bkdlConfig.getBkZkServersForWriter()) |
| .ledgersPath(bkdlConfig.getBkLedgersPath()) |
| .build(); |
| try { |
| return calculateLedgerSpaceUsage(bkc, executorService); |
| } finally { |
| bkc.close(); |
| } |
| } finally { |
| zkc.close(); |
| executorService.shutdown(); |
| } |
| } |
| |
| private long calculateLedgerSpaceUsage(BookKeeperClient bkc, |
| final ExecutorService executorService) |
| throws IOException { |
| final AtomicLong totalBytes = new AtomicLong(0); |
| final AtomicLong totalEntries = new AtomicLong(0); |
| final AtomicLong numLedgers = new AtomicLong(0); |
| |
| LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); |
| |
| final SettableFuture<Void> doneFuture = SettableFuture.create(); |
| final BookKeeper bk = bkc.get(); |
| |
| BookkeeperInternalCallbacks.Processor<Long> collector = |
| new BookkeeperInternalCallbacks.Processor<Long>() { |
| @Override |
| public void process(final Long lid, |
| final AsyncCallback.VoidCallback cb) { |
| numLedgers.incrementAndGet(); |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| bk.asyncOpenLedgerNoRecovery(lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8), |
| new org.apache.bookkeeper.client.AsyncCallback.OpenCallback() { |
| @Override |
| public void openComplete(int rc, LedgerHandle lh, Object ctx) { |
| final int cbRc; |
| if (BKException.Code.OK == rc) { |
| totalBytes.addAndGet(lh.getLength()); |
| totalEntries.addAndGet(lh.getLastAddConfirmed() + 1); |
| cbRc = rc; |
| } else { |
| cbRc = BKException.Code.ZKException; |
| } |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| cb.processResult(cbRc, null, null); |
| } |
| }); |
| } |
| }, null); |
| } |
| }); |
| } |
| }; |
| AsyncCallback.VoidCallback finalCb = new AsyncCallback.VoidCallback() { |
| @Override |
| public void processResult(int rc, String path, Object ctx) { |
| if (BKException.Code.OK == rc) { |
| doneFuture.set(null); |
| } else { |
| doneFuture.setException(BKException.create(rc)); |
| } |
| } |
| }; |
| lm.asyncProcessLedgers(collector, finalCb, null, BKException.Code.OK, BKException.Code.ZKException); |
| try { |
| doneFuture.get(); |
| logger.info("calculated {} ledgers\n\ttotal bytes = {}\n\ttotal entries = {}", |
| new Object[] { numLedgers.get(), totalBytes.get(), totalEntries.get() }); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new DLInterruptedException("Interrupted on calculating ledger space : ", e); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException)(e.getCause()); |
| } else { |
| throw new IOException("Failed to calculate ledger space : ", e.getCause()); |
| } |
| } |
| return totalBytes.get(); |
| } |
| |
| public void close() { |
| // no-op |
| } |
| |
| static interface Action<T> { |
| void execute(T item) throws IOException ; |
| } |
| |
| static <T> void executeAction(final LinkedBlockingQueue<T> queue, |
| final int numThreads, |
| final Action<T> action) throws IOException { |
| final CountDownLatch failureLatch = new CountDownLatch(1); |
| final CountDownLatch doneLatch = new CountDownLatch(queue.size()); |
| final AtomicInteger numFailures = new AtomicInteger(0); |
| final AtomicInteger completedThreads = new AtomicInteger(0); |
| |
| ExecutorService executorService = Executors.newFixedThreadPool(numThreads); |
| try { |
| for (int i = 0 ; i < numThreads; i++) { |
| executorService.submit(new Runnable() { |
| @Override |
| public void run() { |
| while (true) { |
| T item = queue.poll(); |
| if (null == item) { |
| break; |
| } |
| try { |
| action.execute(item); |
| } catch (IOException ioe) { |
| logger.error("Failed to execute action on item '{}'", item, ioe); |
| numFailures.incrementAndGet(); |
| failureLatch.countDown(); |
| break; |
| } |
| doneLatch.countDown(); |
| } |
| if (numFailures.get() == 0 && completedThreads.incrementAndGet() == numThreads) { |
| failureLatch.countDown(); |
| } |
| } |
| }); |
| } |
| try { |
| failureLatch.await(); |
| if (numFailures.get() > 0) { |
| throw new IOException("Encountered " + numFailures.get() + " failures on executing action."); |
| } |
| doneLatch.await(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| logger.warn("Interrupted on executing action", ie); |
| throw new DLInterruptedException("Interrupted on executing action", ie); |
| } |
| } finally { |
| executorService.shutdown(); |
| } |
| } |
| |
| } |