blob: b34de583e4d4e0c1957ea0f08646d915b2d7e689 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;
import java.security.GeneralSecurityException;
import java.net.InetSocketAddress;
import java.util.concurrent.RejectedExecutionException;
/**
* Read only ledger handle. This ledger handle allows you to
* read from a ledger but not to write to it. It overrides all
* the public write operations from LedgerHandle.
* It should be returned for BookKeeper#openLedger operations.
*/
class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListener {
class MetadataUpdater extends SafeRunnable {
final LedgerMetadata m;
MetadataUpdater(LedgerMetadata metadata) {
this.m = metadata;
}
@Override
public void safeRun() {
Version.Occurred occurred =
ReadOnlyLedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
if (Version.Occurred.BEFORE == occurred) {
LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m);
ReadOnlyLedgerHandle.this.metadata = this.m;
}
}
}
ReadOnlyLedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata,
DigestType digestType, byte[] password, boolean watch)
throws GeneralSecurityException, NumberFormatException {
super(bk, ledgerId, metadata, digestType, password);
if (watch) {
bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);
}
}
@Override
public void close()
throws InterruptedException, BKException {
bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
}
@Override
public void asyncClose(CloseCallback cb, Object ctx) {
bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
cb.closeComplete(BKException.Code.OK, this, ctx);
}
@Override
public long addEntry(byte[] data) throws InterruptedException, BKException {
return addEntry(data, 0, data.length);
}
@Override
public long addEntry(byte[] data, int offset, int length)
throws InterruptedException, BKException {
LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
throw BKException.create(BKException.Code.IllegalOpException);
}
@Override
public void asyncAddEntry(final byte[] data, final AddCallback cb,
final Object ctx) {
asyncAddEntry(data, 0, data.length, cb, ctx);
}
@Override
public void asyncAddEntry(final byte[] data, final int offset, final int length,
final AddCallback cb, final Object ctx) {
LOG.error("Tried to add entry on a Read-Only ledger handle, ledgerid=" + ledgerId);
cb.addComplete(BKException.Code.IllegalOpException, this,
LedgerHandle.INVALID_ENTRY_ID, ctx);
}
@Override
void handleBookieFailure(final InetSocketAddress addr, final int bookieIndex) {
blockAddCompletions.incrementAndGet();
synchronized (metadata) {
try {
if (!metadata.currentEnsemble.get(bookieIndex).equals(addr)) {
// ensemble has already changed, failure of this addr is immaterial
LOG.debug("Write did not succeed to {}, bookieIndex {},"
+" but we have already fixed it.", addr, bookieIndex);
blockAddCompletions.decrementAndGet();
return;
}
replaceBookieInMetadata(addr, bookieIndex);
blockAddCompletions.decrementAndGet();
// the failed bookie has been replaced
unsetSuccessAndSendWriteRequest(bookieIndex);
} catch (BKException.BKNotEnoughBookiesException e) {
LOG.error("Could not get additional bookie to "
+ "remake ensemble, closing ledger: " + ledgerId);
handleUnrecoverableErrorDuringAdd(e.getCode());
return;
}
}
}
@Override
public void onChanged(long lid, LedgerMetadata newMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ledger metadata update on {} : {}", lid, newMetadata);
}
if (this.ledgerId != lid) {
return;
}
if (null == newMetadata) {
return;
}
Version.Occurred occurred =
this.metadata.getVersion().compare(newMetadata.getVersion());
if (LOG.isDebugEnabled()) {
LOG.debug("Try to update metadata from {} to {} : {}",
new Object[] { this.metadata, newMetadata, occurred });
}
if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
try {
bk.mainWorkerPool.submitOrdered(ledgerId, new MetadataUpdater(newMetadata));
} catch (RejectedExecutionException ree) {
LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}",
ledgerId, newMetadata);
}
}
}
@Override
public String toString() {
return String.format("ReadOnlyLedgerHandle(lid = %d, id = %d)", ledgerId, super.hashCode());
}
}