blob: cf97c9296ca1d67bf2c8e3f90e668dac8299698e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tephra.persist;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tephra.metrics.MetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* Common implementation of a transaction log, backed by file reader and writer based storage. Classes extending
* this class, must also implement {@link TransactionLogWriter} and {@link TransactionLogReader}.
*/
public abstract class AbstractTransactionLog implements TransactionLog {
/** Time limit, in milliseconds, of an append to the transaction log before we log it as "slow". */
private static final long SLOW_APPEND_THRESHOLD = 1000L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
private final AtomicLong logSequence = new AtomicLong();
private final MetricsCollector metricsCollector;
protected long timestamp;
private volatile boolean initialized;
private volatile boolean closed;
private AtomicLong syncedUpTo = new AtomicLong();
private List<Entry> pendingWrites = Lists.newLinkedList();
private TransactionLogWriter writer;
public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
this.timestamp = timestamp;
this.metricsCollector = metricsCollector;
}
/**
* Initializes the log file, opening a file writer. Clients calling {@code init()} should ensure that they
* also call {@link HDFSTransactionLog#close()}.
* @throws java.io.IOException If an error is encountered initializing the file writer.
*/
public synchronized void init() throws IOException {
if (initialized) {
return;
}
this.writer = createWriter();
this.initialized = true;
}
/**
* Returns a log writer to be used for appending any new {@link TransactionEdit} objects.
*/
protected abstract TransactionLogWriter createWriter() throws IOException;
@Override
public abstract String getName();
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public void append(TransactionEdit edit) throws IOException {
long startTime = System.nanoTime();
synchronized (this) {
ensureAvailable();
Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
// add to pending edits
append(entry);
}
// wait for sync to complete
sync();
long durationMillis = (System.nanoTime() - startTime) / 1000000L;
if (durationMillis > SLOW_APPEND_THRESHOLD) {
LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
}
}
@Override
public void append(List<TransactionEdit> edits) throws IOException {
long startTime = System.nanoTime();
synchronized (this) {
ensureAvailable();
for (TransactionEdit edit : edits) {
Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
// add to pending edits
append(entry);
}
}
// wait for sync to complete
sync();
long durationMillis = (System.nanoTime() - startTime) / 1000000L;
if (durationMillis > SLOW_APPEND_THRESHOLD) {
LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
}
}
private void ensureAvailable() throws IOException {
if (closed) {
throw new IOException("Log " + getName() + " is already closed, cannot append!");
}
if (!initialized) {
init();
}
}
/*
* Appends new writes to the pendingWrites. It is better to keep it in
* our own queue rather than writing it to the HDFS output stream because
* HDFSOutputStream.writeChunk is not lightweight at all.
*/
private void append(Entry e) throws IOException {
pendingWrites.add(e);
}
// Returns all currently pending writes. New writes
// will accumulate in a new list.
private List<Entry> getPendingWrites() {
synchronized (this) {
List<Entry> save = this.pendingWrites;
this.pendingWrites = new LinkedList<>();
return save;
}
}
private void sync() throws IOException {
// writes out pending entries to the HLog
TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
return;
}
// prevent writer being dereferenced
tmpWriter = writer;
List<Entry> currentPending = getPendingWrites();
if (!currentPending.isEmpty()) {
tmpWriter.commitMarker(currentPending.size());
}
// write out all accumulated entries to log.
for (Entry e : currentPending) {
tmpWriter.append(e);
entryCount++;
latestSeq = Math.max(latestSeq, e.getKey().get());
}
}
long lastSynced = syncedUpTo.get();
// someone else might have already synced our edits, avoid double syncing
if (lastSynced < latestSeq) {
tmpWriter.sync();
metricsCollector.histogram("wal.sync.size", entryCount);
syncedUpTo.compareAndSet(lastSynced, latestSeq);
}
}
@Override
public synchronized void close() throws IOException {
if (closed) {
return;
}
// perform a final sync if any outstanding writes
if (!pendingWrites.isEmpty()) {
sync();
}
// NOTE: writer is lazy-inited, so it can be null
if (writer != null) {
this.writer.close();
}
this.closed = true;
}
public boolean isClosed() {
return closed;
}
@Override
public abstract TransactionLogReader getReader() throws IOException;
/**
* Represents an entry in the transaction log. Each entry consists of a key, generated from an incrementing sequence
* number, and a value, the {@link TransactionEdit} being stored.
*/
public static class Entry implements Writable {
private LongWritable key;
private TransactionEdit edit;
// for Writable
public Entry() {
this.key = new LongWritable();
this.edit = new TransactionEdit();
}
public Entry(LongWritable key, TransactionEdit edit) {
this.key = key;
this.edit = edit;
}
public LongWritable getKey() {
return this.key;
}
public TransactionEdit getEdit() {
return this.edit;
}
@Override
public void write(DataOutput out) throws IOException {
this.key.write(out);
this.edit.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.key.readFields(in);
this.edit.readFields(in);
}
}
// package private for testing
@Deprecated
@VisibleForTesting
static class CaskEntry implements Writable {
private LongWritable key;
private co.cask.tephra.persist.TransactionEdit edit;
// for Writable
public CaskEntry() {
this.key = new LongWritable();
this.edit = new co.cask.tephra.persist.TransactionEdit();
}
public CaskEntry(LongWritable key, co.cask.tephra.persist.TransactionEdit edit) {
this.key = key;
this.edit = edit;
}
public LongWritable getKey() {
return this.key;
}
public co.cask.tephra.persist.TransactionEdit getEdit() {
return this.edit;
}
@Override
public void write(DataOutput out) throws IOException {
this.key.write(out);
this.edit.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.key.readFields(in);
this.edit.readFields(in);
}
}
}