| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.tserver.log; |
| |
| import static java.util.Collections.singletonList; |
| |
| import java.io.IOException; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.SynchronousQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.accumulo.core.client.Durability; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.dataImpl.KeyExtent; |
| import org.apache.accumulo.core.protobuf.ProtobufUtil; |
| import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; |
| import org.apache.accumulo.core.util.Halt; |
| import org.apache.accumulo.core.util.threads.ThreadPools; |
| import org.apache.accumulo.fate.util.Retry; |
| import org.apache.accumulo.fate.util.Retry.RetryFactory; |
| import org.apache.accumulo.server.fs.VolumeManager; |
| import org.apache.accumulo.server.replication.StatusUtil; |
| import org.apache.accumulo.server.replication.proto.Replication.Status; |
| import org.apache.accumulo.server.util.ReplicationTableUtil; |
| import org.apache.accumulo.tserver.TabletMutations; |
| import org.apache.accumulo.tserver.TabletServer; |
| import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; |
| import org.apache.accumulo.tserver.log.DfsLogger.ServerResources; |
| import org.apache.accumulo.tserver.tablet.CommitSession; |
| import org.apache.hadoop.fs.Path; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Central logging facility for the TServerInfo. |
| * |
| * Forwards in-memory updates to remote logs, carefully writing the same data to every log, while |
| * maintaining the maximum thread parallelism for greater performance. As new logs are used and |
| * minor compactions are performed, the metadata table is kept up-to-date. |
| * |
| */ |
| public class TabletServerLogger { |
| |
| private static final Logger log = LoggerFactory.getLogger(TabletServerLogger.class); |
| |
| private final AtomicLong logSizeEstimate = new AtomicLong(); |
| private final long maxSize; |
| private final long maxAge; |
| |
| private final TabletServer tserver; |
| |
| // The current logger |
| private DfsLogger currentLog = null; |
| private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>(); |
| private ThreadPoolExecutor nextLogMaker; |
| |
| // The current generation of logs. |
| // Because multiple threads can be using a log at one time, a log |
| // failure is likely to affect multiple threads, who will all attempt to |
| // create a new log. This will cause many unnecessary updates to the |
| // metadata table. |
| // We'll use this generational counter to determine if another thread has |
| // already fetched a new log. |
| private final AtomicInteger logId = new AtomicInteger(); |
| |
| // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to |
| // change them |
| private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock(); |
| |
| private final AtomicLong syncCounter; |
| private final AtomicLong flushCounter; |
| |
| private long createTime = 0; |
| |
| private final RetryFactory createRetryFactory; |
| private Retry createRetry = null; |
| |
| private final RetryFactory writeRetryFactory; |
| |
| private abstract static class TestCallWithWriteLock { |
| abstract boolean test(); |
| |
| abstract void withWriteLock() throws IOException; |
| } |
| |
| /** |
| * Pattern taken from the documentation for ReentrantReadWriteLock |
| * |
| * @param rwlock |
| * lock to use |
| * @param code |
| * a test/work pair |
| */ |
| private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWriteLock code) |
| throws IOException { |
| // Get a read lock |
| rwlock.readLock().lock(); |
| try { |
| // does some condition exist that needs the write lock? |
| if (code.test()) { |
| // Yes, let go of the readLock |
| rwlock.readLock().unlock(); |
| // Grab the write lock |
| rwlock.writeLock().lock(); |
| try { |
| // double-check the condition, since we let go of the lock |
| if (code.test()) { |
| // perform the work with with write lock held |
| code.withWriteLock(); |
| } |
| } finally { |
| // regain the readLock |
| rwlock.readLock().lock(); |
| // unlock the write lock |
| rwlock.writeLock().unlock(); |
| } |
| } |
| } finally { |
| // always let go of the lock |
| rwlock.readLock().unlock(); |
| } |
| } |
| |
| public TabletServerLogger(TabletServer tserver, long maxSize, AtomicLong syncCounter, |
| AtomicLong flushCounter, RetryFactory createRetryFactory, RetryFactory writeRetryFactory, |
| long maxAge) { |
| this.tserver = tserver; |
| this.maxSize = maxSize; |
| this.syncCounter = syncCounter; |
| this.flushCounter = flushCounter; |
| this.createRetryFactory = createRetryFactory; |
| this.createRetry = null; |
| this.writeRetryFactory = writeRetryFactory; |
| this.maxAge = maxAge; |
| } |
| |
| private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { |
| final AtomicReference<DfsLogger> result = new AtomicReference<>(); |
| testLockAndRun(logIdLock, new TestCallWithWriteLock() { |
| @Override |
| boolean test() { |
| result.set(currentLog); |
| if (currentLog != null) |
| logIdOut.set(logId.get()); |
| return currentLog == null; |
| } |
| |
| @Override |
| void withWriteLock() { |
| createLogger(); |
| result.set(currentLog); |
| if (currentLog != null) |
| logIdOut.set(logId.get()); |
| else |
| logIdOut.set(-1); |
| } |
| }); |
| return result.get(); |
| } |
| |
| /** |
| * Get the current WAL file |
| * |
| * @return The name of the current log, or null if there is no current log. |
| */ |
| public String getLogFile() { |
| logIdLock.readLock().lock(); |
| try { |
| if (currentLog == null) { |
| return null; |
| } |
| return currentLog.getFileName(); |
| } finally { |
| logIdLock.readLock().unlock(); |
| } |
| } |
| |
| private synchronized void createLogger() { |
| if (!logIdLock.isWriteLockedByCurrentThread()) { |
| throw new IllegalStateException("createLoggers should be called with write lock held!"); |
| } |
| |
| if (currentLog != null) { |
| throw new IllegalStateException("createLoggers should not be called when current log is set"); |
| } |
| |
| try { |
| startLogMaker(); |
| Object next = nextLog.take(); |
| if (next instanceof Exception) { |
| throw (Exception) next; |
| } |
| if (next instanceof DfsLogger) { |
| currentLog = (DfsLogger) next; |
| logId.incrementAndGet(); |
| log.info("Using next log {}", currentLog.getFileName()); |
| |
| // When we successfully create a WAL, make sure to reset the Retry. |
| if (createRetry != null) { |
| createRetry = null; |
| } |
| |
| this.createTime = System.currentTimeMillis(); |
| return; |
| } else { |
| throw new RuntimeException("Error: unexpected type seen: " + next); |
| } |
| } catch (Exception t) { |
| if (createRetry == null) { |
| createRetry = createRetryFactory.createRetry(); |
| } |
| |
| // We have more retries or we exceeded the maximum number of accepted failures |
| if (createRetry.canRetry()) { |
| // Use the createRetry and record the time in which we did so |
| createRetry.useRetry(); |
| |
| try { |
| // Backoff |
| createRetry.waitForNextAttempt(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } else { |
| log.error("Repeatedly failed to create WAL. Going to exit tabletserver.", t); |
| // We didn't have retries or we failed too many times. |
| Halt.halt("Experienced too many errors creating WALs, giving up", 1); |
| } |
| |
| // The exception will trigger the log creation to be re-attempted. |
| throw new RuntimeException(t); |
| } |
| } |
| |
| private synchronized void startLogMaker() { |
| if (nextLogMaker != null) { |
| return; |
| } |
| nextLogMaker = ThreadPools.createFixedThreadPool(1, "WALog creator", false); |
| nextLogMaker.submit(new Runnable() { |
| @Override |
| public void run() { |
| final ServerResources conf = tserver.getServerConfig(); |
| final VolumeManager fs = conf.getVolumeManager(); |
| while (!nextLogMaker.isShutdown()) { |
| log.debug("Creating next WAL"); |
| DfsLogger alog = null; |
| |
| try { |
| alog = new DfsLogger(tserver.getContext(), conf, syncCounter, flushCounter); |
| alog.open(tserver.getClientAddressString()); |
| } catch (Exception t) { |
| log.error("Failed to open WAL", t); |
| // the log is not advertised in ZK yet, so we can just delete it if it exists |
| if (alog != null) { |
| try { |
| alog.close(); |
| } catch (Exception e) { |
| log.error("Failed to close WAL after it failed to open", e); |
| } |
| |
| try { |
| Path path = alog.getPath(); |
| if (fs.exists(path)) { |
| fs.delete(path); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to delete a WAL that failed to open", e); |
| } |
| } |
| |
| try { |
| nextLog.offer(t, 12, TimeUnit.HOURS); |
| } catch (InterruptedException ex) { |
| // ignore |
| } |
| |
| continue; |
| } |
| |
| String fileName = alog.getFileName(); |
| log.debug("Created next WAL {}", fileName); |
| |
| try { |
| tserver.addNewLogMarker(alog); |
| } catch (Exception t) { |
| log.error("Failed to add new WAL marker for " + fileName, t); |
| |
| try { |
| // Intentionally not deleting walog because it may have been advertised in ZK. See |
| // #949 |
| alog.close(); |
| } catch (Exception e) { |
| log.error("Failed to close WAL after it failed to open", e); |
| } |
| |
| // it's possible the log was advertised in ZK even though we got an |
| // exception. If there's a chance the WAL marker may have been created, |
| // this will ensure it's closed. Either the close will be written and |
| // the GC will clean it up, or the tserver is about to die due to sesson |
| // expiration and the GC will also clean it up. |
| try { |
| tserver.walogClosed(alog); |
| } catch (Exception e) { |
| log.error("Failed to close WAL that failed to open: " + fileName, e); |
| } |
| |
| try { |
| nextLog.offer(t, 12, TimeUnit.HOURS); |
| } catch (InterruptedException ex) { |
| // ignore |
| } |
| |
| continue; |
| } |
| |
| try { |
| while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) { |
| log.info("Our WAL was not used for 12 hours: {}", fileName); |
| } |
| } catch (InterruptedException e) { |
| // ignore - server is shutting down |
| } |
| } |
| } |
| }); |
| } |
| |
| private synchronized void close() throws IOException { |
| if (!logIdLock.isWriteLockedByCurrentThread()) { |
| throw new IllegalStateException("close should be called with write lock held!"); |
| } |
| try { |
| if (currentLog != null) { |
| try { |
| currentLog.close(); |
| } catch (DfsLogger.LogClosedException ex) { |
| // ignore |
| } catch (Exception ex) { |
| log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex); |
| } finally { |
| this.tserver.walogClosed(currentLog); |
| currentLog = null; |
| logSizeEstimate.set(0); |
| } |
| } |
| } catch (Exception t) { |
| throw new IOException(t); |
| } |
| } |
| |
| interface Writer { |
| LoggerOperation write(DfsLogger logger) throws Exception; |
| } |
| |
| private void write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer, |
| Retry writeRetry) throws IOException { |
| // Work very hard not to lock this during calls to the outside world |
| int currentLogId = logId.get(); |
| |
| boolean success = false; |
| while (!success) { |
| try { |
| // get a reference to the loggers that no other thread can touch |
| AtomicInteger currentId = new AtomicInteger(-1); |
| DfsLogger copy = initializeLoggers(currentId); |
| currentLogId = currentId.get(); |
| |
| // add the logger to the log set for the memory in the tablet, |
| // update the metadata table if we've never used this tablet |
| |
| if (currentLogId == logId.get()) { |
| for (CommitSession commitSession : sessions) { |
| if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { |
| try { |
| // Scribble out a tablet definition and then write to the metadata table |
| write(singletonList(commitSession), false, |
| logger -> logger.defineTablet(commitSession), writeRetry); |
| } finally { |
| commitSession.finishUpdatingLogsUsed(); |
| } |
| |
| // Need to release |
| KeyExtent extent = commitSession.getExtent(); |
| if (ReplicationConfigurationUtil.isEnabled(extent, |
| tserver.getTableConfiguration(extent))) { |
| Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis()); |
| log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " |
| + copy.getFileName()); |
| // Got some new WALs, note this in the metadata table |
| ReplicationTableUtil.updateFiles(tserver.getContext(), commitSession.getExtent(), |
| copy.getFileName(), status); |
| } |
| } |
| } |
| } |
| |
| // Make sure that the logs haven't changed out from underneath our copy |
| if (currentLogId == logId.get()) { |
| |
| // write the mutation to the logs |
| LoggerOperation lop = writer.write(copy); |
| lop.await(); |
| |
| // double-check: did the log set change? |
| success = (currentLogId == logId.get()); |
| } |
| } catch (DfsLogger.LogClosedException | ClosedChannelException ex) { |
| writeRetry.logRetry(log, "Logs closed while writing", ex); |
| } catch (Exception t) { |
| writeRetry.logRetry(log, "Failed to write to WAL", t); |
| |
| try { |
| // Backoff |
| writeRetry.waitForNextAttempt(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| } finally { |
| writeRetry.useRetry(); |
| } |
| // Some sort of write failure occurred. Grab the write lock and reset the logs. |
| // But since multiple threads will attempt it, only attempt the reset when |
| // the logs haven't changed. |
| final int finalCurrent = currentLogId; |
| if (!success) { |
| testLockAndRun(logIdLock, new TestCallWithWriteLock() { |
| |
| @Override |
| boolean test() { |
| return finalCurrent == logId.get(); |
| } |
| |
| @Override |
| void withWriteLock() throws IOException { |
| close(); |
| } |
| }); |
| } |
| } |
| // if the log gets too big or too old, reset it .. grab the write lock first |
| logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead |
| testLockAndRun(logIdLock, new TestCallWithWriteLock() { |
| @Override |
| boolean test() { |
| return (logSizeEstimate.get() > maxSize) |
| || ((System.currentTimeMillis() - createTime) > maxAge); |
| } |
| |
| @Override |
| void withWriteLock() throws IOException { |
| close(); |
| } |
| }); |
| } |
| |
| /** |
| * Log a single mutation. This method expects mutations that have a durability other than NONE. |
| */ |
| public void log(final CommitSession commitSession, final Mutation m, final Durability durability) |
| throws IOException { |
| if (durability == Durability.DEFAULT || durability == Durability.NONE) { |
| throw new IllegalArgumentException("Unexpected durability " + durability); |
| } |
| write(singletonList(commitSession), false, logger -> logger.log(commitSession, m, durability), |
| writeRetryFactory.createRetry()); |
| logSizeEstimate.addAndGet(m.numBytes()); |
| } |
| |
| /** |
| * Log mutations. This method expects mutations that have a durability other than NONE. |
| */ |
| public void logManyTablets(Map<CommitSession,TabletMutations> loggables) throws IOException { |
| if (loggables.isEmpty()) |
| return; |
| |
| write(loggables.keySet(), false, logger -> logger.logManyTablets(loggables.values()), |
| writeRetryFactory.createRetry()); |
| for (TabletMutations entry : loggables.values()) { |
| if (entry.getMutations().size() < 1) { |
| throw new IllegalArgumentException("logManyTablets: logging empty mutation list"); |
| } |
| for (Mutation m : entry.getMutations()) { |
| logSizeEstimate.addAndGet(m.numBytes()); |
| } |
| } |
| } |
| |
| public void minorCompactionFinished(final CommitSession commitSession, final long walogSeq, |
| final Durability durability) throws IOException { |
| write(singletonList(commitSession), true, |
| logger -> logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), durability), |
| writeRetryFactory.createRetry()); |
| } |
| |
| public long minorCompactionStarted(final CommitSession commitSession, final long seq, |
| final String fullyQualifiedFileName, final Durability durability) throws IOException { |
| write( |
| singletonList(commitSession), false, logger -> logger.minorCompactionStarted(seq, |
| commitSession.getLogId(), fullyQualifiedFileName, durability), |
| writeRetryFactory.createRetry()); |
| return seq; |
| } |
| |
| public void recover(VolumeManager fs, KeyExtent extent, List<Path> logs, Set<String> tabletFiles, |
| MutationReceiver mr) throws IOException { |
| try { |
| SortedLogRecovery recovery = new SortedLogRecovery(fs); |
| recovery.recover(extent, logs, tabletFiles, mr); |
| } catch (Exception e) { |
| throw new IOException(e); |
| } |
| } |
| } |