blob: 8b36d3bdc32f8c04ffef92264a2899dd8adf983e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.service.stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
import com.twitter.distributedlog.exceptions.StreamUnavailableException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.service.streamset.PartitionMap;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track
* of Streams.
*
* <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
* a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply
* creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl
* constructor.
*/
public class StreamManagerImpl implements StreamManager {
private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
private final ConcurrentHashMap<String, Stream> streams =
new ConcurrentHashMap<String, Stream>();
private final AtomicInteger numCached = new AtomicInteger(0);
private final ConcurrentHashMap<String, Stream> acquiredStreams =
new ConcurrentHashMap<String, Stream>();
private final AtomicInteger numAcquired = new AtomicInteger(0);
//
// Partitions
//
private final StreamPartitionConverter partitionConverter;
private final PartitionMap cachedPartitions = new PartitionMap();
private final PartitionMap acquiredPartitions = new PartitionMap();
final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
private final ScheduledExecutorService executorService;
private final DistributedLogConfiguration dlConfig;
private final StreamConfigProvider streamConfigProvider;
private final String clientId;
private boolean closed = false;
private final StreamFactory streamFactory;
private final DistributedLogNamespace dlNamespace;
public StreamManagerImpl(String clientId,
DistributedLogConfiguration dlConfig,
ScheduledExecutorService executorService,
StreamFactory streamFactory,
StreamPartitionConverter partitionConverter,
StreamConfigProvider streamConfigProvider,
DistributedLogNamespace dlNamespace) {
this.clientId = clientId;
this.executorService = executorService;
this.streamFactory = streamFactory;
this.partitionConverter = partitionConverter;
this.dlConfig = dlConfig;
this.streamConfigProvider = streamConfigProvider;
this.dlNamespace = dlNamespace;
}
private DynamicDistributedLogConfiguration getDynConf(String streamName) {
Optional<DynamicDistributedLogConfiguration> dynDlConf =
streamConfigProvider.getDynamicStreamConfig(streamName);
if (dynDlConf.isPresent()) {
return dynDlConf.get();
} else {
return ConfUtils.getConstDynConf(dlConfig);
}
}
@Override
public boolean allowAcquire(Stream stream) {
return acquiredPartitions.addPartition(
stream.getPartition(),
stream.getStreamConfiguration().getMaxAcquiredPartitionsPerProxy());
}
/**
* Must be enqueued to an executor to avoid deadlocks (close and execute-op both
* try to acquire the same read-write lock).
*/
@Override
public Future<Void> deleteAndRemoveAsync(final String stream) {
final Promise<Void> result = new Promise<Void>();
java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
@Override
public void run() {
result.become(doDeleteAndRemoveAsync(stream));
}
}, 0);
if (null == scheduleFuture) {
return Future.exception(
new ServiceUnavailableException("Couldn't schedule a delete task."));
}
return result;
}
/**
* Must be enqueued to an executor to avoid deadlocks (close and execute-op both
* try to acquire the same read-write lock).
*/
@Override
public Future<Void> closeAndRemoveAsync(final String streamName) {
final Promise<Void> releasePromise = new Promise<Void>();
java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
@Override
public void run() {
releasePromise.become(doCloseAndRemoveAsync(streamName));
}
}, 0);
if (null == scheduleFuture) {
return Future.exception(
new ServiceUnavailableException("Couldn't schedule a release task."));
}
return releasePromise;
}
@Override
public Future<Void> createStreamAsync(final String stream) {
final Promise<Void> createPromise = new Promise<Void>();
java.util.concurrent.Future<?> scheduleFuture = schedule(new Runnable() {
@Override
public void run() {
try {
dlNamespace.createLog(stream);
createPromise.setValue(null);
} catch (Exception e) {
createPromise.setException(e);
}
}
}, 0);
if (null == scheduleFuture) {
return Future.exception(
new ServiceUnavailableException("Couldn't schedule a create task."));
}
return createPromise;
}
@Override
public void notifyReleased(Stream stream) {
acquiredPartitions.removePartition(stream.getPartition());
if (acquiredStreams.remove(stream.getStreamName(), stream)) {
numAcquired.getAndDecrement();
}
}
@Override
public void notifyAcquired(Stream stream) {
if (null == acquiredStreams.put(stream.getStreamName(), stream)) {
numAcquired.getAndIncrement();
}
}
@Override
public boolean notifyRemoved(Stream stream) {
cachedPartitions.removePartition(stream.getPartition());
if (streams.remove(stream.getStreamName(), stream)) {
numCached.getAndDecrement();
return true;
}
return false;
}
@Override
public Map<String, String> getStreamOwnershipMap(Optional<String> regex) {
Map<String, String> ownershipMap = new HashMap<String, String>();
for (Map.Entry<String, Stream> entry : acquiredStreams.entrySet()) {
String name = entry.getKey();
if (regex.isPresent() && !name.matches(regex.get())) {
continue;
}
Stream stream = entry.getValue();
if (null == stream) {
continue;
}
String owner = stream.getOwner();
if (null == owner) {
ownershipMap.put(name, clientId);
}
}
return ownershipMap;
}
@Override
public Stream getStream(String stream) {
return streams.get(stream);
}
@Override
public Stream getOrCreateStream(String streamName, boolean start) throws IOException {
Stream stream = streams.get(streamName);
if (null == stream) {
closeLock.readLock().lock();
try {
if (closed) {
return null;
}
DynamicDistributedLogConfiguration dynConf = getDynConf(streamName);
int maxCachedPartitions = dynConf.getMaxCachedPartitionsPerProxy();
// get partition from the stream name
Partition partition = partitionConverter.convert(streamName);
// add partition to cached map
if (!cachedPartitions.addPartition(partition, maxCachedPartitions)) {
throw new StreamUnavailableException("Stream " + streamName
+ " is not allowed to cache more than " + maxCachedPartitions + " partitions");
}
stream = newStream(streamName, dynConf);
Stream oldWriter = streams.putIfAbsent(streamName, stream);
if (null != oldWriter) {
stream = oldWriter;
} else {
numCached.getAndIncrement();
logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream);
stream.initialize();
if (start) {
stream.start();
}
}
} finally {
closeLock.readLock().unlock();
}
}
return stream;
}
@Override
public Future<List<Void>> closeStreams() {
int numAcquired = acquiredStreams.size();
int numCached = streams.size();
logger.info("Closing all acquired streams : acquired = {}, cached = {}.",
numAcquired, numCached);
Set<Stream> streamsToClose = new HashSet<Stream>();
streamsToClose.addAll(streams.values());
return closeStreams(streamsToClose, Optional.<RateLimiter>absent());
}
@Override
public void scheduleRemoval(final Stream stream, long delayMs) {
if (delayMs > 0) {
logger.info("Scheduling removal of stream {} from cache after {} sec.",
stream.getStreamName(), delayMs);
}
schedule(new Runnable() {
@Override
public void run() {
if (notifyRemoved(stream)) {
logger.info("Removed cached stream {} after probation.", stream.getStreamName());
} else {
logger.info("Cached stream {} already removed.", stream.getStreamName());
}
}
}, delayMs);
}
@Override
public int numAcquired() {
return numAcquired.get();
}
@Override
public int numCached() {
return numCached.get();
}
@Override
public boolean isAcquired(String streamName) {
return acquiredStreams.containsKey(streamName);
}
@Override
public void close() {
closeLock.writeLock().lock();
try {
if (closed) {
return;
}
closed = true;
} finally {
closeLock.writeLock().unlock();
}
}
private Future<List<Void>> closeStreams(Set<Stream> streamsToClose, Optional<RateLimiter> rateLimiter) {
if (streamsToClose.isEmpty()) {
logger.info("No streams to close.");
List<Void> emptyList = new ArrayList<Void>();
return Future.value(emptyList);
}
List<Future<Void>> futures = new ArrayList<Future<Void>>(streamsToClose.size());
for (Stream stream : streamsToClose) {
if (rateLimiter.isPresent()) {
rateLimiter.get().acquire();
}
futures.add(stream.requestClose("Close Streams"));
}
return Future.collect(futures);
}
private Stream newStream(String name, DynamicDistributedLogConfiguration streamConf) {
return streamFactory.create(name, streamConf, this);
}
public Future<Void> doCloseAndRemoveAsync(final String streamName) {
Stream stream = streams.get(streamName);
if (null == stream) {
logger.info("No stream {} to release.", streamName);
return Future.value(null);
} else {
return stream.requestClose("release ownership");
}
}
/**
* Dont schedule if we're closed - closeLock is acquired to close, so if we acquire the
* lock and discover we're not closed, we won't schedule.
*/
private java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
closeLock.readLock().lock();
try {
if (closed) {
return null;
} else if (delayMs > 0) {
return executorService.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
} else {
return executorService.submit(runnable);
}
} catch (RejectedExecutionException ree) {
logger.error("Failed to schedule task {} in {} ms : ",
new Object[] { runnable, delayMs, ree });
return null;
} finally {
closeLock.readLock().unlock();
}
}
private Future<Void> doDeleteAndRemoveAsync(final String streamName) {
Stream stream = streams.get(streamName);
if (null == stream) {
logger.warn("No stream {} to delete.", streamName);
return Future.exception(new UnexpectedException("No stream " + streamName + " to delete."));
} else {
Future<Void> result;
logger.info("Deleting stream {}, {}", streamName, stream);
try {
stream.delete();
result = stream.requestClose("Stream Deleted");
} catch (IOException e) {
logger.error("Failed on removing stream {} : ", streamName, e);
result = Future.exception(e);
}
return result;
}
}
@VisibleForTesting
public ConcurrentHashMap<String, Stream> getCachedStreams() {
return streams;
}
@VisibleForTesting
public ConcurrentHashMap<String, Stream> getAcquiredStreams() {
return acquiredStreams;
}
}