blob: 4a51e2d78ba0fff02504e54ff4147e40961a3926 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock.IndexInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexEntry;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OffloadIndexBlockV2Impl implements OffloadIndexBlockV2 {
private static final Logger log = LoggerFactory.getLogger(OffloadIndexBlockImpl.class);
private static final int INDEX_MAGIC_WORD = 0x3D1FB0BC;
private Map<Long, LedgerInfo> segmentMetadata;
private final Map<Long, LedgerMetadata> compatibleMetadata = Maps.newTreeMap();
private long dataObjectLength;
private long dataHeaderLength;
// private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
private Map<Long, TreeMap<Long, OffloadIndexEntryImpl>> indexEntries;
private final Handle<OffloadIndexBlockV2Impl> recyclerHandle;
private static final Recycler<OffloadIndexBlockV2Impl> RECYCLER = new Recycler<OffloadIndexBlockV2Impl>() {
@Override
protected OffloadIndexBlockV2Impl newObject(Handle<OffloadIndexBlockV2Impl> handle) {
return new OffloadIndexBlockV2Impl(handle);
}
};
private OffloadIndexBlockV2Impl(Handle<OffloadIndexBlockV2Impl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
public static OffloadIndexBlockV2Impl get(Map<Long, LedgerInfo> metadata, long dataObjectLength,
long dataHeaderLength,
Map<Long, List<OffloadIndexEntryImpl>> entries) {
OffloadIndexBlockV2Impl block = RECYCLER.get();
block.indexEntries = new HashMap<>();
entries.forEach((ledgerId, list) -> {
final TreeMap<Long, OffloadIndexEntryImpl> inLedger = block.indexEntries
.getOrDefault(ledgerId, new TreeMap<>());
list.forEach(indexEntry -> {
inLedger.put(indexEntry.getEntryId(), indexEntry);
});
block.indexEntries.put(ledgerId, inLedger);
});
block.segmentMetadata = metadata;
block.dataObjectLength = dataObjectLength;
block.dataHeaderLength = dataHeaderLength;
return block;
}
public static OffloadIndexBlockV2Impl get(int magic, DataInputStream stream) throws IOException {
OffloadIndexBlockV2Impl block = RECYCLER.get();
block.indexEntries = Maps.newTreeMap();
block.segmentMetadata = Maps.newTreeMap();
if (magic != INDEX_MAGIC_WORD) {
throw new IOException(String.format("Invalid MagicWord. read: 0x%x expected: 0x%x",
magic, INDEX_MAGIC_WORD));
}
block.fromStream(stream);
return block;
}
public void recycle() {
dataObjectLength = -1;
dataHeaderLength = -1;
segmentMetadata = null;
indexEntries.clear();
indexEntries = null;
if (recyclerHandle != null) {
recyclerHandle.recycle(this);
}
}
@Override
public OffloadIndexEntry getIndexEntryForEntry(long ledgerId, long messageEntryId) throws IOException {
if (messageEntryId > getLedgerMetadata(ledgerId).getLastEntryId()) {
log.warn("Try to get entry: {}, which beyond lastEntryId {}, return null",
messageEntryId, getLedgerMetadata(ledgerId).getLastEntryId());
throw new IndexOutOfBoundsException("Entry index: " + messageEntryId
+ " beyond lastEntryId: " + getLedgerMetadata(ledgerId).getLastEntryId());
}
// find the greatest mapping Id whose entryId <= messageEntryId
return this.indexEntries.get(ledgerId).floorEntry(messageEntryId).getValue();
}
public long getStartEntryId(long ledgerId) {
return this.indexEntries.get(ledgerId).firstEntry().getValue().getEntryId();
}
@Override
public int getEntryCount() {
int ans = 0;
for (TreeMap<Long, OffloadIndexEntryImpl> v : this.indexEntries.values()) {
ans += v.size();
}
return ans;
}
@Override
public LedgerMetadata getLedgerMetadata(long ledgerId) {
if (compatibleMetadata.containsKey(ledgerId)) {
return compatibleMetadata.get(ledgerId);
} else if (segmentMetadata.containsKey(ledgerId)) {
final CompatibleMetadata result = new CompatibleMetadata(segmentMetadata.get(ledgerId));
compatibleMetadata.put(ledgerId, result);
return result;
} else {
return null;
}
}
@Override
public long getDataObjectLength() {
return this.dataObjectLength;
}
@Override
public long getDataBlockHeaderLength() {
return this.dataHeaderLength;
}
/**
* Get the content of the index block as InputStream.
* Read out in format:
* | index_magic_header | index_block_len | data_object_len | data_header_len |
* | index_entry_count | segment_metadata_len | segment metadata | index entries... |
*/
@Override
public IndexInputStream toStream() throws IOException {
int indexBlockLength = 4 /* magic header */
+ 4 /* index block length */
+ 8 /* data object length */
+ 8; /* data header length */
Map<Long, byte[]> metaBytesMap = new HashMap<>();
for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
Long ledgerId = e.getKey();
TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
int indexEntryCount = ledgerIndexEntries.size();
byte[] ledgerMetadataByte = this.segmentMetadata.get(ledgerId).toByteArray();
int segmentMetadataLength = ledgerMetadataByte.length;
indexBlockLength += 8 /* ledger id length */
+ 4 /* index entry count */
+ 4 /* segment metadata length */
+ segmentMetadataLength
+ indexEntryCount * (8 + 4 + 8);
metaBytesMap.put(ledgerId, ledgerMetadataByte);
}
ByteBuf out = PulsarByteBufAllocator.DEFAULT.buffer(indexBlockLength, indexBlockLength);
out.writeInt(INDEX_MAGIC_WORD)
.writeInt(indexBlockLength)
.writeLong(dataObjectLength)
.writeLong(dataHeaderLength);
for (Map.Entry<Long, TreeMap<Long, OffloadIndexEntryImpl>> e : this.indexEntries.entrySet()) {
Long ledgerId = e.getKey();
TreeMap<Long, OffloadIndexEntryImpl> ledgerIndexEntries = e.getValue();
int indexEntryCount = ledgerIndexEntries.size();
byte[] ledgerMetadataByte = metaBytesMap.get(ledgerId);
out.writeLong(ledgerId)
.writeInt(indexEntryCount)
.writeInt(ledgerMetadataByte.length)
.writeBytes(ledgerMetadataByte);
ledgerIndexEntries.values().forEach(idxEntry -> {
out.writeLong(idxEntry.getEntryId())
.writeInt(idxEntry.getPartId())
.writeLong(idxEntry.getOffset());
});
}
return new IndexInputStream(new ByteBufInputStream(out, true), indexBlockLength);
}
private static LedgerInfo parseLedgerInfo(byte[] bytes) throws IOException {
return LedgerInfo.newBuilder().mergeFrom(bytes).build();
}
private OffloadIndexBlockV2 fromStream(DataInputStream dis) throws IOException {
dis.readInt(); // no used index block length
this.dataObjectLength = dis.readLong();
this.dataHeaderLength = dis.readLong();
while (dis.available() > 0) {
long ledgerId = dis.readLong();
int indexEntryCount = dis.readInt();
int segmentMetadataLength = dis.readInt();
byte[] metadataBytes = new byte[segmentMetadataLength];
if (segmentMetadataLength != dis.read(metadataBytes)) {
log.error("Read ledgerMetadata from bytes failed");
throw new IOException("Read ledgerMetadata from bytes failed");
}
final LedgerInfo ledgerInfo = parseLedgerInfo(metadataBytes);
this.segmentMetadata.put(ledgerId, ledgerInfo);
final TreeMap<Long, OffloadIndexEntryImpl> indexEntries = new TreeMap<>();
for (int i = 0; i < indexEntryCount; i++) {
long entryId = dis.readLong();
indexEntries.putIfAbsent(entryId, OffloadIndexEntryImpl.of(entryId, dis.readInt(),
dis.readLong(), dataHeaderLength));
}
this.indexEntries.put(ledgerId, indexEntries);
}
return this;
}
public static int getIndexMagicWord() {
return INDEX_MAGIC_WORD;
}
@Override
public void close() {
recycle();
}
@VisibleForTesting
static class CompatibleMetadata implements LedgerMetadata {
LedgerInfo ledgerInfo;
public CompatibleMetadata(LedgerInfo ledgerInfo) {
this.ledgerInfo = ledgerInfo;
}
@Override
public long getLedgerId() {
return ledgerInfo.getLedgerId();
}
@Override
public int getEnsembleSize() {
return 0;
}
@Override
public int getWriteQuorumSize() {
return 0;
}
@Override
public int getAckQuorumSize() {
return 0;
}
@Override
public long getLastEntryId() {
return ledgerInfo.getEntries() - 1;
}
@Override
public long getLength() {
return ledgerInfo.getSize();
}
@Override
public boolean hasPassword() {
return false;
}
@Override
public byte[] getPassword() {
return new byte[0];
}
@Override
public DigestType getDigestType() {
return null;
}
@Override
public long getCtime() {
return 0;
}
@Override
public boolean isClosed() {
return true;
}
@Override
public Map<String, byte[]> getCustomMetadata() {
return null;
}
@Override
public List<BookieId> getEnsembleAt(long entryId) {
return null;
}
@Override
public NavigableMap<Long, ? extends List<BookieId>> getAllEnsembles() {
return null;
}
@Override
public State getState() {
return null;
}
@Override
public String toSafeString() {
return null;
}
@Override
public int getMetadataFormatVersion() {
return 0;
}
@Override
public long getCToken() {
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompatibleMetadata that = (CompatibleMetadata) o;
return ledgerInfo.equals(that.ledgerInfo);
}
@Override
public int hashCode() {
return Objects.hash(ledgerInfo);
}
}
}