| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import java.io.ByteArrayInputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Log record is the basic element in a log. |
| * |
| * <p>A log is a sequence of log records. Each log record is a sequence of bytes. |
| * Log records are written sequentially into a stream, and will be assigned with |
| * an unique system generated sequence number {@link DLSN} (distributedlog sequence |
| * number). Besides {@link DLSN}, application can assign its own sequence number |
| * while constructing log records. The application defined sequence number is called |
| * <code>TransactionID</code> (<i>txid</i>). Either {@link DLSN} or <code>TransactionId</code> |
| * could be used to position readers to start from specific log records. |
| * |
| * <h3>User Record</h3> |
| * |
| * <p>User records are the records written by applications and read by applications. They |
| * are constructed via {@link #LogRecord(long, byte[])} by applications and appended to |
| * logs by writers. And they would be deserialized from bytes by the readers and return |
| * to applications. |
| * |
| * <h3>Control Record</h3> |
| * |
| * <p>Control records are special records that written by distributedlog. They are invisible |
| * to applications. They could be treated as <i>commit requests</i> as what people could find |
| * in distributed consensus algorithms, since they are usually written by distributedlog to |
| * commit application written records. <i>Commit</i> means making application written records |
| * visible to readers to achieve consistent views among them. |
| * |
| * <p>They are named as 'Control Records' for controlling visibility of application written records. |
| * |
| * <p>The transaction id of 'Control Records' are assigned by distributedlog by inheriting from last |
| * written user records. So we could indicate what user records that a control record is committing |
| * by looking at its transaction id. |
| * |
| * <h4>EndOfStream Record</h4> |
| * |
| * <p><code>EoS</code>(EndOfStream) is a special control record that would be written by a writer |
| * to seal a log. After a <i>EoS</i> record is written to a log, no writers could append any record |
| * after that and readers will get {@link com.twitter.distributedlog.exceptions.EndOfStreamException} |
| * when they reach EoS. |
| * |
| * <p>TransactionID of EoS is <code>Long.MAX_VALUE</code>. |
| * |
| * <h3>Serialization & Deserialization</h3> |
| * |
| * <p>Data type in brackets. Interpretation should be on the basis of data types and not individual |
| * bytes to honor Endianness. |
| * |
| * <pre> |
| * LogRecord structure: |
| * ------------------- |
| * Bytes 0 - 7 : Metadata (Long) |
| * Bytes 8 - 15 : TxId (Long) |
| * Bytes 16 - 19 : Payload length (Integer) |
| * Bytes 20 - 20+payload.length-1 : Payload (Byte[]) |
| * |
| * Metadata: 8 Bytes (Long) |
| * -------- |
| * |
| * 0x 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
| * |_____________| |_____| |
| * | | |
| * position flags |
| * |
| * Flags: 2 Bytes (least significant) |
| * ----- |
| * Bit 0 : If set, control record, else record with payload. |
| * Bit 1 : If set, end of stream. |
| * Bits 2 - 15 : Unused |
| * </pre> |
| * |
| * <h3>Sequence Numbers</h3> |
| * |
| * <p>A record is associated with three types of sequence numbers. They are generated |
| * and used for different purposes. Check {@link LogRecordWithDLSN} for more details. |
| * |
| * @see LogRecordWithDLSN |
| */ |
| public class LogRecord { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(LogRecord.class); |
| |
| // Allow 4K overhead for metadata within the max transmission size |
| public static final int MAX_LOGRECORD_SIZE = 1024 * 1024 - 8 * 1024; //1MB - 8KB |
| // Allow 4K overhead for transmission overhead |
| public static final int MAX_LOGRECORDSET_SIZE = 1024 * 1024 - 4 * 1024; //1MB - 4KB |
| |
| private static final int INPUTSTREAM_MARK_LIMIT = 16; |
| |
| static final long LOGRECORD_METADATA_FLAGS_MASK = 0xffffL; |
| static final long LOGRECORD_METADATA_FLAGS_UMASK = 0xffffffffffff0000L; |
| static final long LOGRECORD_METADATA_POSITION_MASK = 0x0000ffffffff0000L; |
| static final long LOGRECORD_METADATA_POSITION_UMASK = 0xffff00000000ffffL; |
| static final int LOGRECORD_METADATA_POSITION_SHIFT = 16; |
| static final long LOGRECORD_METADATA_UNUSED_MASK = 0xffff000000000000L; |
| |
| // TODO: Replace with EnumSet |
| static final long LOGRECORD_FLAGS_CONTROL_MESSAGE = 0x1; |
| static final long LOGRECORD_FLAGS_END_OF_STREAM = 0x2; |
| static final long LOGRECORD_FLAGS_RECORD_SET = 0x4; |
| |
| private long metadata; |
| private long txid; |
| private byte[] payload; |
| |
| /** |
| * Construct an uninitialized log record. |
| * |
| * <p>NOTE: only deserializer should call this constructor. |
| */ |
| protected LogRecord() { |
| this.txid = 0; |
| this.metadata = 0; |
| } |
| |
| /** |
| * Construct a log record with <i>TransactionId</i> and payload. |
| * |
| * <p>Usually writer would construct the log record for writing. |
| * |
| * @param txid |
| * application defined transaction id. |
| * @param payload |
| * record data |
| */ |
| public LogRecord(long txid, byte[] payload) { |
| this.txid = txid; |
| this.payload = payload; |
| this.metadata = 0; |
| } |
| |
| // |
| // Accessors |
| // |
| |
| /** |
| * Return application defined transaction id. |
| * |
| * @return transacton id. |
| */ |
| public long getTransactionId() { |
| return txid; |
| } |
| |
| /** |
| * Set application defined transaction id. |
| * |
| * @param txid application defined transaction id. |
| */ |
| protected void setTransactionId(long txid) { |
| this.txid = txid; |
| } |
| |
| /** |
| * Return the payload of this log record. |
| * |
| * @return payload of this log record. |
| */ |
| public byte[] getPayload() { |
| return payload; |
| } |
| |
| /** |
| * Set payload for this log record. |
| * |
| * @param payload payload of this log record |
| */ |
| void setPayload(byte[] payload) { |
| this.payload = payload; |
| } |
| |
| /** |
| * Return the payload as an {@link InputStream}. |
| * |
| * @return payload as input stream |
| */ |
| public InputStream getPayLoadInputStream() { |
| return new ByteArrayInputStream(payload); |
| } |
| |
| // |
| // Metadata & Flags |
| // |
| |
| protected void setMetadata(long metadata) { |
| this.metadata = metadata; |
| } |
| |
| protected long getMetadata() { |
| return this.metadata; |
| } |
| |
| /** |
| * Set the position in the log segment. |
| * |
| * @see #getPositionWithinLogSegment() |
| * @param positionWithinLogSegment position in the log segment. |
| */ |
| void setPositionWithinLogSegment(int positionWithinLogSegment) { |
| assert(positionWithinLogSegment >= 0); |
| metadata = (metadata & LOGRECORD_METADATA_POSITION_UMASK) |
| | (((long) positionWithinLogSegment) << LOGRECORD_METADATA_POSITION_SHIFT); |
| } |
| |
| /** |
| * The position in the log segment means how many records (inclusive) added to the log segment so far. |
| * |
| * @return position of the record in the log segment. |
| */ |
| public int getPositionWithinLogSegment() { |
| long ret = (metadata & LOGRECORD_METADATA_POSITION_MASK) >> LOGRECORD_METADATA_POSITION_SHIFT; |
| if (ret < 0 || ret > Integer.MAX_VALUE) { |
| throw new IllegalArgumentException |
| (ret + " position should never exceed max integer value"); |
| } |
| return (int) ret; |
| } |
| |
| /** |
| * Get the last position of this record in the log segment. |
| * |
| * <p>If the record isn't record set, it would be same as {@link #getPositionWithinLogSegment()}, |
| * otherwise, it would be {@link #getPositionWithinLogSegment()} + numRecords - 1. If the record set |
| * version is unknown, it would be same as {@link #getPositionWithinLogSegment()}. |
| * |
| * @return last position of this record in the log segment. |
| */ |
| int getLastPositionWithinLogSegment() { |
| if (isRecordSet()) { |
| try { |
| return getPositionWithinLogSegment() + LogRecordSet.numRecords(this) - 1; |
| } catch (IOException e) { |
| // if it is unrecognized record set, we will return the position of this record set. |
| return getPositionWithinLogSegment(); |
| } |
| } else { |
| return getPositionWithinLogSegment(); |
| } |
| } |
| |
| /** |
| * Set the record to represent a set of records. |
| * |
| * <p>The bytes in this record is the serialized format of {@link LogRecordSet}. |
| */ |
| public void setRecordSet() { |
| metadata = metadata | LOGRECORD_FLAGS_RECORD_SET; |
| } |
| |
| /** |
| * Check if the record represents a set of records. |
| * |
| * @return true if the record represents a set of records, otherwise false. |
| * @see #setRecordSet() |
| */ |
| public boolean isRecordSet() { |
| return isRecordSet(metadata); |
| } |
| |
| public static boolean isRecordSet(long metadata) { |
| return ((metadata & LOGRECORD_FLAGS_RECORD_SET) != 0); |
| } |
| |
| @VisibleForTesting |
| public void setControl() { |
| metadata = metadata | LOGRECORD_FLAGS_CONTROL_MESSAGE; |
| } |
| |
| /** |
| * Check if the record is a control record. |
| * |
| * @return true if the record is a control record, otherwise false. |
| */ |
| public boolean isControl() { |
| return isControl(metadata); |
| } |
| |
| /** |
| * Check flags to see if it indicates a control record. |
| * |
| * @param flags record flags |
| * @return true if the record is a control record, otherwise false. |
| */ |
| public static boolean isControl(long flags) { |
| return ((flags & LOGRECORD_FLAGS_CONTROL_MESSAGE) != 0); |
| } |
| |
| /** |
| * Set the record as <code>EoS</code> mark. |
| * |
| * @see #isEndOfStream() |
| */ |
| void setEndOfStream() { |
| metadata = metadata | LOGRECORD_FLAGS_END_OF_STREAM; |
| } |
| |
| /** |
| * Check if the record is a <code>EoS</code> mark. |
| * |
| * <p><code>EoS</code> mark is a special record that writer would |
| * add to seal a log. after <code>Eos</code> mark is written, |
| * writers can't write any more records and readers will get |
| * {@link com.twitter.distributedlog.exceptions.EndOfStreamException} |
| * when they reach <code>EoS</code>. |
| * |
| * @return true |
| */ |
| boolean isEndOfStream() { |
| return ((metadata & LOGRECORD_FLAGS_END_OF_STREAM) != 0); |
| } |
| |
| // |
| // Serialization & Deserialization |
| // |
| |
| protected void readPayload(DataInputStream in) throws IOException { |
| int length = in.readInt(); |
| if (length < 0) { |
| throw new EOFException("Log Record is corrupt: Negative length " + length); |
| } |
| payload = new byte[length]; |
| in.readFully(payload); |
| } |
| |
| private void writePayload(DataOutputStream out) throws IOException { |
| out.writeInt(payload.length); |
| out.write(payload); |
| } |
| |
| private void writeToStream(DataOutputStream out) throws IOException { |
| out.writeLong(metadata); |
| out.writeLong(txid); |
| writePayload(out); |
| } |
| |
| /** |
| * The size of the serialized log record. |
| * |
| * <p>This is used to estimate how much will be be appended to the in-memory buffer. |
| * |
| * @return serialized size |
| */ |
| int getPersistentSize() { |
| // Flags + TxId + Payload-length + payload |
| return 2 * (Long.SIZE / 8) + Integer.SIZE / 8 + payload.length; |
| } |
| |
| /** |
| * Writer class to write log records into an output {@code stream}. |
| */ |
| public static class Writer { |
| private final DataOutputStream buf; |
| |
| public Writer(DataOutputStream out) { |
| this.buf = out; |
| } |
| |
| /** |
| * Write an operation to the output stream. |
| * |
| * @param record The operation to write |
| * @throws IOException if an error occurs during writing. |
| */ |
| public void writeOp(LogRecord record) throws IOException { |
| record.writeToStream(buf); |
| } |
| |
| public int getPendingBytes() { |
| return buf.size(); |
| } |
| } |
| |
| /** |
| * Reader class to read log records from an input {@code stream}. |
| */ |
| public static class Reader { |
| private final RecordStream recordStream; |
| private final DataInputStream in; |
| private final long startSequenceId; |
| private final boolean deserializeRecordSet; |
| private static final int SKIP_BUFFER_SIZE = 512; |
| private LogRecordSet.Reader recordSetReader = null; |
| private LogRecordWithDLSN lastRecordSkipTo = null; |
| |
| /** |
| * Construct the reader. |
| * |
| * @param recordStream the record stream for generating {@code DLSN}s. |
| * @param in The stream to read from. |
| * @param startSequenceId the start sequence id. |
| */ |
| public Reader(RecordStream recordStream, |
| DataInputStream in, |
| long startSequenceId) { |
| this(recordStream, in, startSequenceId, true); |
| } |
| |
| public Reader(RecordStream recordStream, |
| DataInputStream in, |
| long startSequenceId, |
| boolean deserializeRecordSet) { |
| this.recordStream = recordStream; |
| this.in = in; |
| this.startSequenceId = startSequenceId; |
| this.deserializeRecordSet = deserializeRecordSet; |
| } |
| |
| /** |
| * Read an log record from the input stream. |
| * |
| * <p/> Note that the objects returned from this method may be re-used by future |
| * calls to the same method. |
| * |
| * @return the operation read from the stream, or null at the end of the file |
| * @throws IOException on error. |
| */ |
| public LogRecordWithDLSN readOp() throws IOException { |
| LogRecordWithDLSN nextRecordInStream; |
| while (true) { |
| if (lastRecordSkipTo != null) { |
| nextRecordInStream = lastRecordSkipTo; |
| recordStream.advance(1); |
| lastRecordSkipTo = null; |
| return nextRecordInStream; |
| } |
| if (recordSetReader != null) { |
| nextRecordInStream = recordSetReader.nextRecord(); |
| if (null != nextRecordInStream) { |
| recordStream.advance(1); |
| return nextRecordInStream; |
| } else { |
| recordSetReader = null; |
| } |
| } |
| |
| try { |
| long metadata = in.readLong(); |
| // Reading the first 8 bytes positions the record stream on the correct log record |
| // By this time all components of the DLSN are valid so this is where we shoud |
| // retrieve the currentDLSN and advance to the next |
| // Given that there are 20 bytes following the read position of the previous call |
| // to readLong, we should not have moved ahead in the stream. |
| nextRecordInStream = new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId); |
| nextRecordInStream.setMetadata(metadata); |
| nextRecordInStream.setTransactionId(in.readLong()); |
| nextRecordInStream.readPayload(in); |
| if (LOG.isTraceEnabled()) { |
| if (nextRecordInStream.isControl()) { |
| LOG.trace("Reading {} Control DLSN {}", |
| recordStream.getName(), nextRecordInStream.getDlsn()); |
| } else { |
| LOG.trace("Reading {} Valid DLSN {}", |
| recordStream.getName(), nextRecordInStream.getDlsn()); |
| } |
| } |
| |
| int numRecords = 1; |
| if (!deserializeRecordSet && nextRecordInStream.isRecordSet()) { |
| numRecords = LogRecordSet.numRecords(nextRecordInStream); |
| } |
| |
| if (deserializeRecordSet && nextRecordInStream.isRecordSet()) { |
| recordSetReader = LogRecordSet.of(nextRecordInStream); |
| } else { |
| recordStream.advance(numRecords); |
| return nextRecordInStream; |
| } |
| } catch (EOFException eof) { |
| // Expected |
| break; |
| } |
| } |
| return null; |
| } |
| |
| public boolean skipTo(long txId, boolean skipControl) throws IOException { |
| return skipTo(txId, null, skipControl); |
| } |
| |
| public boolean skipTo(DLSN dlsn) throws IOException { |
| return skipTo(null, dlsn, false); |
| } |
| |
| private boolean skipTo(Long txId, DLSN dlsn, boolean skipControl) throws IOException { |
| LOG.debug("SkipTo"); |
| byte[] skipBuffer = null; |
| boolean found = false; |
| while (true) { |
| try { |
| long flags; |
| long currTxId; |
| |
| // if there is not record set, read next record |
| if (null == recordSetReader) { |
| in.mark(INPUTSTREAM_MARK_LIMIT); |
| flags = in.readLong(); |
| currTxId = in.readLong(); |
| } else { |
| // check record set until reach end of record set |
| lastRecordSkipTo = recordSetReader.nextRecord(); |
| if (null == lastRecordSkipTo) { |
| // reach end of record set |
| recordSetReader = null; |
| continue; |
| } |
| flags = lastRecordSkipTo.getMetadata(); |
| currTxId = lastRecordSkipTo.getTransactionId(); |
| } |
| |
| if ((null != dlsn) && (recordStream.getCurrentPosition().compareTo(dlsn) >= 0)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Found position {} beyond {}", recordStream.getCurrentPosition(), dlsn); |
| } |
| if (null == lastRecordSkipTo) { |
| in.reset(); |
| } |
| found = true; |
| break; |
| } |
| if ((null != txId) && (currTxId >= txId)) { |
| if (!skipControl || !isControl(flags)) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Found position {} beyond {}", currTxId, txId); |
| } |
| if (null == lastRecordSkipTo) { |
| in.reset(); |
| } |
| found = true; |
| break; |
| } |
| } |
| |
| if (null != lastRecordSkipTo) { |
| recordStream.advance(1); |
| continue; |
| } |
| |
| // get the num of records to skip |
| if (isRecordSet(flags)) { |
| // read record set |
| LogRecordWithDLSN record = |
| new LogRecordWithDLSN(recordStream.getCurrentPosition(), startSequenceId); |
| record.setMetadata(flags); |
| record.setTransactionId(currTxId); |
| record.readPayload(in); |
| recordSetReader = LogRecordSet.of(record); |
| } else { |
| int length = in.readInt(); |
| if (length < 0) { |
| // We should never really see this as we only write complete entries to |
| // BK and BK client has logic to detect torn writes (through checksum) |
| LOG.info("Encountered Record with negative length at TxId: {}", currTxId); |
| break; |
| } |
| // skip single record |
| if (null == skipBuffer) { |
| skipBuffer = new byte[SKIP_BUFFER_SIZE]; |
| } |
| int read = 0; |
| while (read < length) { |
| int bytesToRead = Math.min(length - read, SKIP_BUFFER_SIZE); |
| in.readFully(skipBuffer, 0, bytesToRead); |
| read += bytesToRead; |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Skipped Record with TxId {} DLSN {}", |
| currTxId, recordStream.getCurrentPosition()); |
| } |
| recordStream.advance(1); |
| } |
| } catch (EOFException eof) { |
| LOG.debug("Skip encountered end of file Exception", eof); |
| break; |
| } |
| } |
| return found; |
| } |
| } |
| } |