blob: 034dd6ed77edb90e8b0699635f2474f6cfc24901 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class takes an entry, attaches a digest to it and packages it with relevant
* data so that it can be shipped to the bookie. On the return side, it also
* gets a packet, checks that the digest matches, and extracts the original entry
* for the packet. Currently 3 types of digests are supported: MAC (based on SHA-1) and CRC32 and CRC32C.
*/
public abstract class DigestManager {
private static final Logger logger = LoggerFactory.getLogger(DigestManager.class);
public static final int METADATA_LENGTH = 32;
public static final int LAC_METADATA_LENGTH = 16;
final long ledgerId;
final boolean useV2Protocol;
private final ByteBufAllocator allocator;
abstract int getMacCodeLength();
void update(byte[] data) {
update(Unpooled.wrappedBuffer(data, 0, data.length));
}
abstract void update(ByteBuf buffer);
abstract void populateValueAndReset(ByteBuf buffer);
final int macCodeLength;
public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allocator) {
this.ledgerId = ledgerId;
this.useV2Protocol = useV2Protocol;
this.macCodeLength = getMacCodeLength();
this.allocator = allocator;
}
public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType,
ByteBufAllocator allocator, boolean useV2Protocol) throws GeneralSecurityException {
switch(digestType) {
case HMAC:
return new MacDigestManager(ledgerId, passwd, useV2Protocol, allocator);
case CRC32:
return new CRC32DigestManager(ledgerId, useV2Protocol, allocator);
case CRC32C:
return new CRC32CDigestManager(ledgerId, useV2Protocol, allocator);
case DUMMY:
return new DummyDigestManager(ledgerId, useV2Protocol, allocator);
default:
throw new GeneralSecurityException("Unknown checksum type: " + digestType);
}
}
public static byte[] generateMasterKey(byte[] password) throws NoSuchAlgorithmException {
return password.length > 0 ? MacDigestManager.genDigest("ledger", password) : MacDigestManager.EMPTY_LEDGER_KEY;
}
/**
* Computes the digest for an entry and put bytes together for sending.
*
* @param entryId
* @param lastAddConfirmed
* @param length
* @param data
* @return
*/
public ByteBufList computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length,
ByteBuf data) {
ByteBuf headersBuffer;
if (this.useV2Protocol) {
headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
} else {
headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
}
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(entryId);
headersBuffer.writeLong(lastAddConfirmed);
headersBuffer.writeLong(length);
update(headersBuffer);
update(data);
populateValueAndReset(headersBuffer);
return ByteBufList.get(headersBuffer, data);
}
/**
* Computes the digest for writeLac for sending.
*
* @param lac
* @return
*/
public ByteBufList computeDigestAndPackageForSendingLac(long lac) {
ByteBuf headersBuffer;
if (this.useV2Protocol) {
headersBuffer = allocator.buffer(LAC_METADATA_LENGTH + macCodeLength);
} else {
headersBuffer = Unpooled.buffer(LAC_METADATA_LENGTH + macCodeLength);
}
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(lac);
update(headersBuffer);
populateValueAndReset(headersBuffer);
return ByteBufList.get(headersBuffer);
}
private void verifyDigest(ByteBuf dataReceived) throws BKDigestMatchException {
verifyDigest(LedgerHandle.INVALID_ENTRY_ID, dataReceived, true);
}
private void verifyDigest(long entryId, ByteBuf dataReceived) throws BKDigestMatchException {
verifyDigest(entryId, dataReceived, false);
}
private void verifyDigest(long entryId, ByteBuf dataReceived, boolean skipEntryIdCheck)
throws BKDigestMatchException {
if ((METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) {
logger.error("Data received is smaller than the minimum for this digest type. "
+ " Either the packet it corrupt, or the wrong digest is configured. "
+ " Digest type: {}, Packet Length: {}",
this.getClass().getName(), dataReceived.readableBytes());
throw new BKDigestMatchException();
}
update(dataReceived.slice(0, METADATA_LENGTH));
int offset = METADATA_LENGTH + macCodeLength;
update(dataReceived.slice(offset, dataReceived.readableBytes() - offset));
ByteBuf digest = allocator.buffer(macCodeLength);
populateValueAndReset(digest);
try {
if (digest.compareTo(dataReceived.slice(METADATA_LENGTH, macCodeLength)) != 0) {
logger.error("Mac mismatch for ledger-id: " + ledgerId + ", entry-id: " + entryId);
throw new BKDigestMatchException();
}
} finally {
digest.release();
}
long actualLedgerId = dataReceived.readLong();
long actualEntryId = dataReceived.readLong();
if (actualLedgerId != ledgerId) {
logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+ actualLedgerId);
throw new BKDigestMatchException();
}
if (!skipEntryIdCheck && actualEntryId != entryId) {
logger.error("Entry-id mismatch in authenticated message, expected: " + entryId + " , actual: "
+ actualEntryId);
throw new BKDigestMatchException();
}
}
public long verifyDigestAndReturnLac(ByteBuf dataReceived) throws BKDigestMatchException{
if ((LAC_METADATA_LENGTH + macCodeLength) > dataReceived.readableBytes()) {
logger.error("Data received is smaller than the minimum for this digest type."
+ " Either the packet it corrupt, or the wrong digest is configured. "
+ " Digest type: {}, Packet Length: {}",
this.getClass().getName(), dataReceived.readableBytes());
throw new BKDigestMatchException();
}
update(dataReceived.slice(0, LAC_METADATA_LENGTH));
ByteBuf digest = allocator.buffer(macCodeLength);
try {
populateValueAndReset(digest);
if (digest.compareTo(dataReceived.slice(LAC_METADATA_LENGTH, macCodeLength)) != 0) {
logger.error("Mac mismatch for ledger-id LAC: " + ledgerId);
throw new BKDigestMatchException();
}
} finally {
digest.release();
}
long actualLedgerId = dataReceived.readLong();
long lac = dataReceived.readLong();
if (actualLedgerId != ledgerId) {
logger.error("Ledger-id mismatch in authenticated message, expected: " + ledgerId + " , actual: "
+ actualLedgerId);
throw new BKDigestMatchException();
}
return lac;
}
/**
* Verify that the digest matches and returns the data in the entry.
*
* @param entryId
* @param dataReceived
* @return
* @throws BKDigestMatchException
*/
public ByteBuf verifyDigestAndReturnData(long entryId, ByteBuf dataReceived)
throws BKDigestMatchException {
verifyDigest(entryId, dataReceived);
dataReceived.readerIndex(METADATA_LENGTH + macCodeLength);
return dataReceived;
}
/**
* A representation of RecoveryData.
*/
public static final class RecoveryData {
final long lastAddConfirmed;
final long length;
public RecoveryData(long lastAddConfirmed, long length) {
this.lastAddConfirmed = lastAddConfirmed;
this.length = length;
}
public long getLastAddConfirmed() {
return lastAddConfirmed;
}
public long getLength() {
return length;
}
}
public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) throws BKDigestMatchException {
verifyDigest(dataReceived);
dataReceived.readerIndex(8);
dataReceived.readLong(); // skip unused entryId
long lastAddConfirmed = dataReceived.readLong();
long length = dataReceived.readLong();
return new RecoveryData(lastAddConfirmed, length);
}
}