blob: b3fe456e7011088258b4c897e0335a86bcfe6a06 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.impl;
import java.io.IOException;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.distributedlog.impl.BKNamespaceDriver.getZKServersFromDLUri;
public class ZKMetadataAccessor implements MetadataAccessor {
static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
protected final String name;
protected CompletableFuture<Void> closePromise;
protected final URI uri;
// zookeeper clients
// NOTE: The actual zookeeper client is initialized lazily when it is referenced by
// {@link org.apache.distributedlog.ZooKeeperClient#get()}. So it is safe to
// keep builders and their client wrappers here, as they will be used when
// instantiating readers or writers.
protected final ZooKeeperClientBuilder writerZKCBuilder;
protected final ZooKeeperClient writerZKC;
protected final boolean ownWriterZKC;
protected final ZooKeeperClientBuilder readerZKCBuilder;
protected final ZooKeeperClient readerZKC;
protected final boolean ownReaderZKC;
ZKMetadataAccessor(String name,
DistributedLogConfiguration conf,
URI uri,
ZooKeeperClientBuilder writerZKCBuilder,
ZooKeeperClientBuilder readerZKCBuilder,
StatsLogger statsLogger) {
this.name = name;
this.uri = uri;
if (null == writerZKCBuilder) {
RetryPolicy retryPolicy = null;
if (conf.getZKNumRetries() > 0) {
retryPolicy = new BoundExponentialBackoffRetryPolicy(
conf.getZKRetryBackoffStartMillis(),
conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
}
this.writerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
.name(String.format("dlzk:%s:dlm_writer_shared", name))
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.retryThreadCount(conf.getZKClientNumberRetryThreads())
.requestRateLimit(conf.getZKRequestRateLimit())
.zkAclId(conf.getZkAclId())
.uri(uri)
.retryPolicy(retryPolicy)
.statsLogger(statsLogger.scope("dlzk_dlm_writer_shared"));
this.ownWriterZKC = true;
} else {
this.writerZKCBuilder = writerZKCBuilder;
this.ownWriterZKC = false;
}
this.writerZKC = this.writerZKCBuilder.build();
if (null == readerZKCBuilder) {
String zkServersForWriter = getZKServersFromDLUri(uri);
String zkServersForReader;
try {
BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(this.writerZKC, uri);
zkServersForReader = bkdlConfig.getDlZkServersForReader();
} catch (IOException e) {
LOG.warn("Error on resolving dl metadata bindings for {} : ", uri, e);
zkServersForReader = zkServersForWriter;
}
if (zkServersForReader.equals(zkServersForWriter)) {
LOG.info("Used same zookeeper servers '{}' for both writers and readers for {}.",
zkServersForWriter, name);
this.readerZKCBuilder = this.writerZKCBuilder;
this.ownReaderZKC = false;
} else {
RetryPolicy retryPolicy = null;
if (conf.getZKNumRetries() > 0) {
retryPolicy = new BoundExponentialBackoffRetryPolicy(
conf.getZKRetryBackoffStartMillis(),
conf.getZKRetryBackoffMaxMillis(), conf.getZKNumRetries());
}
this.readerZKCBuilder = ZooKeeperClientBuilder.newBuilder()
.name(String.format("dlzk:%s:dlm_reader_shared", name))
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.retryThreadCount(conf.getZKClientNumberRetryThreads())
.requestRateLimit(conf.getZKRequestRateLimit())
.zkServers(zkServersForReader)
.retryPolicy(retryPolicy)
.zkAclId(conf.getZkAclId())
.statsLogger(statsLogger.scope("dlzk_dlm_reader_shared"));
this.ownReaderZKC = true;
}
} else {
this.readerZKCBuilder = readerZKCBuilder;
this.ownReaderZKC = false;
}
this.readerZKC = this.readerZKCBuilder.build();
}
/**
* Get the name of the stream managed by this log manager
*
* @return streamName
*/
@Override
public String getStreamName() {
return name;
}
/**
* Creates or update the metadata stored at the node associated with the
* name and URI
* @param metadata opaque metadata to be stored for the node
* @throws IOException
*/
@Override
public void createOrUpdateMetadata(byte[] metadata) throws IOException {
checkClosedOrInError("createOrUpdateMetadata");
String zkPath = getZKPath();
LOG.debug("Setting application specific metadata on {}", zkPath);
try {
Stat currentStat = writerZKC.get().exists(zkPath, false);
if (currentStat == null) {
if (metadata.length > 0) {
Utils.zkCreateFullPathOptimistic(writerZKC,
zkPath,
metadata,
writerZKC.getDefaultACL(),
CreateMode.PERSISTENT);
}
} else {
writerZKC.get().setData(zkPath, metadata, currentStat.getVersion());
}
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted on creating or updating container metadata", ie);
} catch (Exception exc) {
throw new IOException("Exception creating or updating container metadata", exc);
}
}
/**
* Delete the metadata stored at the associated node. This only deletes the metadata
* and not the node itself
* @throws IOException
*/
@Override
public void deleteMetadata() throws IOException {
checkClosedOrInError("createOrUpdateMetadata");
createOrUpdateMetadata(null);
}
/**
* Retrieve the metadata stored at the node
* @return byte array containing the metadata
* @throws IOException
*/
@Override
public byte[] getMetadata() throws IOException {
checkClosedOrInError("createOrUpdateMetadata");
String zkPath = getZKPath();
LOG.debug("Getting application specific metadata from {}", zkPath);
try {
Stat currentStat = readerZKC.get().exists(zkPath, false);
if (currentStat == null) {
return null;
} else {
return readerZKC.get().getData(zkPath, false, currentStat);
}
} catch (InterruptedException ie) {
throw new DLInterruptedException("Error reading the max tx id from zk", ie);
} catch (Exception e) {
throw new IOException("Error reading the max tx id from zk", e);
}
}
/**
* Close the metadata accessor, freeing any resources it may hold.
* @return future represents the close result.
*/
@Override
public CompletableFuture<Void> asyncClose() {
CompletableFuture<Void> closeFuture;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
closeFuture = closePromise = new CompletableFuture<Void>();
}
// NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
// the managers created by the namespace - whose zkc will be closed by namespace
try {
if (ownWriterZKC) {
writerZKC.close();
}
if (ownReaderZKC) {
readerZKC.close();
}
} catch (Exception e) {
LOG.warn("Exception while closing distributed log manager", e);
}
FutureUtils.complete(closeFuture, null);
return closeFuture;
}
@Override
public void close() throws IOException {
Utils.ioResult(asyncClose());
}
public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
if (null != closePromise) {
throw new AlreadyClosedException("Executing " + operation + " on already closed ZKMetadataAccessor");
}
}
protected String getZKPath() {
return String.format("%s/%s", uri.getPath(), name);
}
@VisibleForTesting
protected ZooKeeperClient getReaderZKC() {
return readerZKC;
}
@VisibleForTesting
protected ZooKeeperClient getWriterZKC() {
return writerZKC;
}
}