blob: d092f2064ce4cff86b4b86de48a6c44f3fed91e2 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.bookie;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.SafeRunnable;
/**
* EntryMemTableWithParallelFlusher.
*/
@Slf4j
class EntryMemTableWithParallelFlusher extends EntryMemTable {
final OrderedExecutor flushExecutor;
public EntryMemTableWithParallelFlusher(final ServerConfiguration conf, final CheckpointSource source,
final StatsLogger statsLogger) {
super(conf, source, statsLogger);
this.flushExecutor = OrderedExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads())
.name("MemtableFlushThreads").build();
}
/**
* Functionally this overridden flushSnapshot does the same as
* EntryMemTable's flushSnapshot, but it uses flushExecutor
* (OrderedExecutor) to process an entry through flusher.
*
* <p>SubMaps of the snapshot corresponding to the entries of the ledgers are
* created and submitted to the flushExecutor with ledgerId as the
* orderingKey to flush process the entries of a ledger.
*/
@Override
long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
AtomicLong flushedSize = new AtomicLong();
if (this.snapshot.compareTo(checkpoint) < 0) {
synchronized (this) {
EntrySkipList keyValues = this.snapshot;
Phaser pendingNumOfLedgerFlushes = new Phaser(1);
AtomicReference<Exception> exceptionWhileFlushingParallelly = new AtomicReference<Exception>();
if (keyValues.compareTo(checkpoint) < 0) {
Map.Entry<EntryKey, EntryKeyValue> thisLedgerFirstMapEntry = keyValues.firstEntry();
EntryKeyValue thisLedgerFirstEntry;
long thisLedgerId;
while (thisLedgerFirstMapEntry != null) {
thisLedgerFirstEntry = thisLedgerFirstMapEntry.getValue();
thisLedgerId = thisLedgerFirstEntry.getLedgerId();
EntryKey thisLedgerCeilingKeyMarker = new EntryKey(thisLedgerId, Long.MAX_VALUE - 1);
/*
* Gets a view of the portion of this map that
* corresponds to entries of this ledger.
*/
ConcurrentNavigableMap<EntryKey, EntryKeyValue> thisLedgerEntries = keyValues
.subMap(thisLedgerFirstEntry, thisLedgerCeilingKeyMarker);
pendingNumOfLedgerFlushes.register();
flushExecutor.executeOrdered(thisLedgerId, new SafeRunnable() {
@Override
public void safeRun() {
try {
long ledger;
boolean ledgerDeleted = false;
for (EntryKey key : thisLedgerEntries.keySet()) {
EntryKeyValue kv = (EntryKeyValue) key;
flushedSize.addAndGet(kv.getLength());
ledger = kv.getLedgerId();
if (!ledgerDeleted) {
try {
flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
} catch (NoLedgerException exception) {
ledgerDeleted = true;
}
}
}
pendingNumOfLedgerFlushes.arriveAndDeregister();
} catch (Exception exc) {
log.error("Got Exception while trying to flush process entryies: ", exc);
exceptionWhileFlushingParallelly.set(exc);
/*
* if we get any unexpected exception while
* trying to flush process entries of a
* ledger, then terminate the
* pendingNumOfLedgerFlushes phaser.
*/
pendingNumOfLedgerFlushes.forceTermination();
}
}
});
thisLedgerFirstMapEntry = keyValues.ceilingEntry(thisLedgerCeilingKeyMarker);
}
boolean phaserTerminatedAbruptly = false;
try {
/*
* while flush processing entries of a ledger if it
* failed because of any unexpected exception then
* pendingNumOfLedgerFlushes phaser would be force
* terminated and because of that arriveAndAwaitAdvance
* would be a negative value.
*/
phaserTerminatedAbruptly = (pendingNumOfLedgerFlushes.arriveAndAwaitAdvance() < 0);
} catch (IllegalStateException ise) {
log.error("Got IllegalStateException while awaiting on Phaser", ise);
throw new IOException("Got IllegalStateException while awaiting on Phaser", ise);
}
if (phaserTerminatedAbruptly) {
log.error("Phaser is terminated while awaiting flushExecutor to complete the entry flushes",
exceptionWhileFlushingParallelly.get());
throw new IOException("Failed to complete the flushSnapshotByParallelizing",
exceptionWhileFlushingParallelly.get());
}
memTableStats.getFlushBytesCounter().add(flushedSize.get());
clearSnapshot(keyValues);
}
}
}
skipListSemaphore.release(flushedSize.intValue());
return flushedSize.longValue();
}
@Override
public void close() throws Exception {
flushExecutor.shutdown();
}
}