blob: 9e3b76c1a12435b1689cbca7f5633641644d1047 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.Hex;
import java.nio.ByteBuffer;
/**
* DistributedLog Sequence Number (DLSN) is the system generated sequence number for log record.
*
* <p>DLSN is comprised with 3 components:
* <ul>
* <li>LogSegment Sequence Number: the sequence number of log segment that the record is written in
* <li>Entry Id: the entry id of the entry that the record is batched in
* <li>Slot Id: the slot id that the record is in the entry
* </ul>
*
* @see LogRecordWithDLSN
*/
public class DLSN implements Comparable<DLSN> {
public static final byte VERSION0 = (byte) 0;
public static final byte VERSION1 = (byte) 1;
// The initial DLSN that DL starts with
public static final DLSN InitialDLSN = new DLSN(1, 0 , 0);
// The non-inclusive lower bound DLSN
public static final DLSN NonInclusiveLowerBound = new DLSN(1, 0 , -1);
// Invalid DLSN
public static final DLSN InvalidDLSN = new DLSN(0,-1,-1);
static final byte CUR_VERSION = VERSION1;
static final int VERSION0_LEN = Long.SIZE * 3 + Byte.SIZE;
static final int VERSION1_LEN = Long.SIZE * 3 / Byte.SIZE + 1;
private final long logSegmentSequenceNo;
private final long entryId;
private final long slotId;
public DLSN(long logSegmentSequenceNo, long entryId, long slotId) {
this.logSegmentSequenceNo = logSegmentSequenceNo;
this.entryId = entryId;
this.slotId = slotId;
}
/**
* Return the sequence number of the log segment that the record is written to.
*
* @return sequence number of the log segment that the record is written to.
*/
public long getLogSegmentSequenceNo() {
return logSegmentSequenceNo;
}
/**
* use {@link #getLogSegmentSequenceNo()} instead
*/
@Deprecated
long getLedgerSequenceNo() {
return logSegmentSequenceNo;
}
/**
* Return the entry id of the batch that the record is written to.
*
* @return entry id of the batch that the record is written to.
*/
public long getEntryId() {
return entryId;
}
/**
* Return the slot id in the batch that the record is written to.
*
* @return slot id in the batch that the record is written to.
*/
public long getSlotId() {
return slotId;
}
@Override
public int compareTo(DLSN that) {
if (this.logSegmentSequenceNo != that.logSegmentSequenceNo) {
return (this.logSegmentSequenceNo < that.logSegmentSequenceNo)? -1 : 1;
} else if (this.entryId != that.entryId) {
return (this.entryId < that.entryId)? -1 : 1;
} else {
return (this.slotId < that.slotId)? -1 : (this.slotId == that.slotId)? 0 : 1;
}
}
/**
* Serialize the DLSN into bytes with current version.
*
* @return the serialized bytes
*/
public byte[] serializeBytes() {
return serializeBytes(CUR_VERSION);
}
/**
* Serialize the DLSN into bytes with given <code>version</code>.
*
* @param version
* version to serialize the DLSN
* @return the serialized bytes
*/
public byte[] serializeBytes(byte version) {
Preconditions.checkArgument(version <= CUR_VERSION && version >= VERSION0);
byte[] data = new byte[CUR_VERSION == version ? VERSION1_LEN : VERSION0_LEN];
ByteBuffer bb = ByteBuffer.wrap(data);
bb.put(version);
bb.putLong(logSegmentSequenceNo);
bb.putLong(entryId);
bb.putLong(slotId);
return data;
}
/**
* Serialize the DLSN into base64 encoded string.
*
* @return serialized base64 string
* @see #serializeBytes()
*/
public String serialize() {
return serialize(CUR_VERSION);
}
/**
* Serialize the DLSN into base64 encoded string with given <code>version</code>.
*
* @param version
* version to serialize the DLSN
* @return the serialized base64 string
* @see #serializeBytes(byte)
*/
public String serialize(byte version) {
return Base64.encodeBase64String(serializeBytes(version));
}
/**
* Deserialize the DLSN from base64 encoded string <code>dlsn</code>.
*
* @param dlsn
* base64 encoded string
* @return dlsn
*/
public static DLSN deserialize(String dlsn) {
byte[] data = Base64.decodeBase64(dlsn);
return deserializeBytes(data);
}
/**
* Deserialize the DLSN from bytes array.
*
* @param data
* serialized bytes
* @return dlsn
*/
public static DLSN deserializeBytes(byte[] data) {
ByteBuffer bb = ByteBuffer.wrap(data);
byte version = bb.get();
if (VERSION0 == version) {
if (VERSION0_LEN != data.length) {
throw new IllegalArgumentException("Invalid version zero DLSN " + Hex.encodeHexString(data));
}
} else if (VERSION1 == version) {
if (VERSION1_LEN != data.length) {
throw new IllegalArgumentException("Invalid version one DLSN " + Hex.encodeHexString(data));
}
} else {
throw new IllegalArgumentException("Invalid DLSN : version = "
+ version + ", " + Hex.encodeHexString(data));
}
return new DLSN(bb.getLong(), bb.getLong(), bb.getLong());
}
// Keep original version0 logic for testing.
@VisibleForTesting
static DLSN deserialize0(String dlsn) {
byte[] data = Base64.decodeBase64(dlsn);
ByteBuffer bb = ByteBuffer.wrap(data);
byte version = bb.get();
if (VERSION0 != version || VERSION0_LEN != data.length) {
throw new IllegalArgumentException("Invalid DLSN " + dlsn);
}
return new DLSN(bb.getLong(), bb.getLong(), bb.getLong());
}
@Override
public String toString() {
return "DLSN{" +
"logSegmentSequenceNo=" + logSegmentSequenceNo +
", entryId=" + entryId +
", slotId=" + slotId +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof DLSN)) return false;
DLSN dlsn = (DLSN) o;
if (entryId != dlsn.entryId) return false;
if (logSegmentSequenceNo != dlsn.logSegmentSequenceNo) return false;
if (slotId != dlsn.slotId) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (logSegmentSequenceNo ^ (logSegmentSequenceNo >>> 32));
result = 31 * result + (int) (entryId ^ (entryId >>> 32));
result = 31 * result + (int) (slotId ^ (slotId >>> 32));
return result;
}
/**
* Positions to a DLSN greater than the current value - this may not
* correspond to an actual LogRecord, its just used by the positioning logic
* to position the reader
*
* @return the next DLSN
*/
public DLSN getNextDLSN() {
return new DLSN(logSegmentSequenceNo, entryId, slotId + 1);
}
/**
* Positions to a DLSN greater than the current value - this may not
* correspond to an actual LogRecord, its just used by the positioning logic
* to position the reader
*
* @return the next DLSN
*/
public DLSN positionOnTheNextLedger() {
return new DLSN(logSegmentSequenceNo + 1 , 0, 0);
}
}