blob: d80b2a4d0b5d590d900b6a4d3db166949981fdfb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.util;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import java.util.concurrent.atomic.AtomicLong;
/**
* <p>
* BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
* Samza's poll() requirements make implementing SystemConsumers somewhat
* tricky. BlockingEnvelopeMap is provided to help other developers write
* SystemConsumers. The intended audience is not those writing Samza jobs, but
* rather those extending Samza to consume from new types of stream providers
* and other systems.
* </p>
*
* <p>
* SystemConsumers that implement BlockingEnvelopeMap need to add messages using
* {@link #put(org.apache.samza.system.SystemStreamPartition, org.apache.samza.system.IncomingMessageEnvelope) put}
* (or {@link #putAll(org.apache.samza.system.SystemStreamPartition, java.util.List) putAll}),
* and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used
* to determine whether a SystemStreamPartition is "caught up" (has read all
* possible messages from the underlying system). For example, with a Kafka
* system, noMoreMessages would be set to true when the last message offset
* returned is equal to the offset high watermark for a given topic/partition.
* </p>
* The BlockingEnvelopeMap is backed by a concurrent map, which allows concurrent
* put or putAll calls to be thread safe without external synchronization.
*/
public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final BlockingEnvelopeMapMetrics metrics;
private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
private volatile Throwable failureCause = null;
public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
this(metricsRegistry, new Clock() {
public long currentTimeMillis() {
return System.currentTimeMillis();
}
});
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
this(metricsRegistry, clock, null);
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
}
/**
* {@inheritDoc}
*/
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
initializeInternalStateForSSP(systemStreamPartition);
}
/**
* Initializes the metrics and in-memory buffer for the {@param systemStreamPartition}.
* @param systemStreamPartition represents the input system stream partition.
*/
private void initializeInternalStateForSSP(SystemStreamPartition systemStreamPartition) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
}
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
return new LinkedBlockingQueue<IncomingMessageEnvelope>();
}
/**
* {@inheritDoc}
*/
@Override
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
long stopTime = clock.currentTimeMillis() + timeout;
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messagesToReturn = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
metrics.incPoll();
for (SystemStreamPartition systemStreamPartition : systemStreamPartitions) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
List<IncomingMessageEnvelope> outgoingList = new ArrayList<IncomingMessageEnvelope>(queue.size());
if (queue.size() > 0) {
queue.drainTo(outgoingList);
} else if (timeout != 0) {
IncomingMessageEnvelope envelope = null;
// How long we can legally block (if timeout > 0)
long timeRemaining = stopTime - clock.currentTimeMillis();
if (timeout == SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES) {
// Block until we get at least one message, or until we catch up to
// the head of the stream.
while (envelope == null && !isAtHead(systemStreamPartition)) {
// Check for consumerFailure and throw exception
if (this.failureCause != null) {
String message = String.format("%s: Consumer has stopped.", this);
throw new SamzaException(message, this.failureCause);
}
metrics.incBlockingPoll(systemStreamPartition);
envelope = queue.poll(1000, TimeUnit.MILLISECONDS);
}
} else if (timeout > 0 && timeRemaining > 0) {
// Block until we get at least one message.
metrics.incBlockingTimeoutPoll(systemStreamPartition);
envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
}
// If we got a message, add it.
if (envelope != null) {
outgoingList.add(envelope);
// Drain any remaining messages without blocking.
queue.drainTo(outgoingList);
}
}
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}
return messagesToReturn;
}
private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) {
long outgoingListBytes = 0;
for (IncomingMessageEnvelope envelope : outgoingList) {
outgoingListBytes += envelope.getSize();
}
// subtract the size of the messages dequeued.
bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1 * outgoingListBytes);
}
/**
* Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
* queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
*
* @param systemStreamPartition SystemStreamPartition that owns the envelope
* @param envelope Message for specified SystemStreamPartition
* @throws InterruptedException from underlying concurrent collection
*/
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
bufferedMessages.get(systemStreamPartition).put(envelope);
bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
}
/**
* Place a collection of {@link org.apache.samza.system.IncomingMessageEnvelope}
* on the queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
* <p>
* Insertion of all the messages into the queue is not guaranteed to be done
* atomically.
* </p>
*
* @param systemStreamPartition SystemStreamPartition that owns the envelope
* @param envelopes Messages for specified SystemStreamPartition
* @throws InterruptedException from underlying concurrent collection
*/
protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
for (IncomingMessageEnvelope envelope : envelopes) {
queue.put(envelope);
}
}
public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
if (queue == null) {
throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
} else {
return queue.size();
}
}
public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
if (sizeInBytes == null) {
throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
} else {
return sizeInBytes.get();
}
}
protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
return noMoreMessage.put(systemStreamPartition, isAtHead);
}
protected boolean isAtHead(SystemStreamPartition systemStreamPartition) {
Boolean isAtHead = noMoreMessage.get(systemStreamPartition);
return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
}
protected void setFailureCause(Throwable throwable) {
this.failureCause = throwable;
}
public class BlockingEnvelopeMapMetrics {
private final String group;
private final MetricsRegistry metricsRegistry;
private final ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>> noMoreMessageGaugeMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
private final Counter pollCount;
public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
this.group = group;
this.metricsRegistry = metricsRegistry;
this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>>();
this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.pollCount = metricsRegistry.newCounter(group, "poll-count");
}
public void initMetrics(SystemStreamPartition systemStreamPartition) {
this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.newGauge(group, "no-more-messages-" + systemStreamPartition, 0));
this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-count-" + systemStreamPartition));
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
}
public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages ? 1 : 0);
}
public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
this.blockingPollCountMap.get(systemStreamPartition).inc();
}
public void incBlockingTimeoutPoll(SystemStreamPartition systemStreamPartition) {
this.blockingPollTimeoutCountMap.get(systemStreamPartition).inc();
}
public void incPoll() {
this.pollCount.inc();
}
}
public class BufferGauge extends Gauge<Integer> {
private final SystemStreamPartition systemStreamPartition;
public BufferGauge(SystemStreamPartition systemStreamPartition, String name) {
super(name, 0);
this.systemStreamPartition = systemStreamPartition;
}
@Override
public Integer getValue() {
Queue<IncomingMessageEnvelope> envelopes = bufferedMessages.get(systemStreamPartition);
if (envelopes == null) {
return 0;
}
return envelopes.size();
}
}
public class BufferSizeGauge extends Gauge<Long> {
private final SystemStreamPartition systemStreamPartition;
public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) {
super(name, 0L);
this.systemStreamPartition = systemStreamPartition;
}
@Override
public Long getValue() {
AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
if (sizeInBytes == null) {
return 0L;
}
return sizeInBytes.get();
}
}
}