blob: f5d70e3d4fa1f8458e310ea84c4cf070fa31bacd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.stream.core.consumer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.stream.core.exception.StreamingException;
import org.apache.kylin.stream.core.metrics.StreamingMetrics;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.model.stats.ConsumerStats;
import org.apache.kylin.stream.core.model.stats.PartitionConsumeStats;
import org.apache.kylin.stream.core.source.MessageFormatException;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.core.storage.StreamingSegmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
public class StreamingConsumerChannel implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(StreamingConsumerChannel.class);
protected volatile boolean stopped;
protected volatile boolean paused;
protected volatile boolean hasStoppedConsuming;
private volatile CountDownLatch pauseLatch;
private volatile CountDownLatch stopLatch;
private Thread consumerThread;
private String cubeName;
private IStreamingConnector connector;
private StreamingSegmentManager cubeSegmentManager;
private volatile IStopConsumptionCondition stopCondition;
private volatile long minAcceptEventTime;
private volatile long parseEventErrorCnt;
private volatile long addEventErrorCnt;
private volatile long incomingEventCnt;
private volatile long dropEventCnt;
private Map<Integer, Meter> eventConsumeMeters;
public StreamingConsumerChannel(String cubeName, IStreamingConnector connector,
StreamingSegmentManager cubeSegmentManager, IStopConsumptionCondition stopCondition) {
this.connector = connector;
this.cubeName = cubeName;
this.cubeSegmentManager = cubeSegmentManager;
this.stopCondition = stopCondition;
this.stopLatch = new CountDownLatch(1);
this.eventConsumeMeters = Maps.newHashMap();
this.minAcceptEventTime = 0;
}
public void setStopCondition(IStopConsumptionCondition stopCondition) {
this.stopCondition = stopCondition;
this.stopCondition.init(connector.getConsumePartitions());
}
public void setMinAcceptEventTime(long minAcceptEventTime) {
this.minAcceptEventTime = minAcceptEventTime;
}
public void start() {
this.consumerThread = new Thread(this, cubeName + "_channel");
consumerThread.setPriority(Thread.MAX_PRIORITY); // Improve the priority of consumer thread to make ingest rate stable
connector.open();
consumerThread.start();
}
@Override
public void run() {
while (!stopped) {
if (!paused) {
StreamingMessage event = null;
try {
event = connector.nextEvent();
if (event == null) {
Thread.sleep(100);
continue;
}
incomingEventCnt++;
recordConsumeMetric(event.getSourcePosition().getPartition(), event.getParams());
if (!stopCondition.isSatisfied(event)) {
if (!isFilter(event)) {
cubeSegmentManager.addEvent(event);
}
} else {
logger.warn("The latest event trigger stopCondition, event = " + event);
this.stopped = true;
}
} catch (MessageFormatException mfe) {
parseEventErrorCnt++;
if (parseEventErrorCnt % 1000 < 3) {
logger.error(mfe.getMessage(), mfe);
}
} catch (InterruptedException ie) {
logger.warn("interrupted!");
stopped = true;
} catch (Exception e) {
addEventErrorCnt++;
if (addEventErrorCnt % 1000 < 3) {
logger.error("error happens when save event:" + event, e);
}
}
} else {
try {
if (pauseLatch != null) {
pauseLatch.countDown();
}
Thread.sleep(100);
} catch (InterruptedException e) {
logger.warn("interrupted!");
stopped = true;
}
}
}
hasStoppedConsuming = true;
logger.warn("Exit from main event loop, start to close cubeSegmentManager.");
try {
cubeSegmentManager.close();
removeMetrics();
} finally {
connector.close();
stopLatch.countDown();
}
}
private void removeMetrics() {
for (Map.Entry<Integer, Meter> meterEntry : eventConsumeMeters.entrySet()) {
StreamingMetrics.getInstance().getMetricRegistry().remove(MetricRegistry
.name(StreamingMetrics.CONSUME_RATE_PFX,
cubeName, String.valueOf(meterEntry.getKey())));
}
}
protected void recordConsumeMetric(Integer partitionId, Map<String, Object> eventParams) {
Meter partitionEventConsumeMeter = eventConsumeMeters.get(partitionId);
if (partitionEventConsumeMeter == null) {
partitionEventConsumeMeter = StreamingMetrics.newMeter(
MetricRegistry.name(StreamingMetrics.CONSUME_RATE_PFX, cubeName, String.valueOf(partitionId)));
eventConsumeMeters.put(partitionId, partitionEventConsumeMeter);
}
partitionEventConsumeMeter.mark();
}
private boolean isFilter(StreamingMessage event) {
if (minAcceptEventTime != 0 && event.getTimestamp() < minAcceptEventTime) {
dropEventCnt++;
// drop events is less than the min accepted event time
if (dropEventCnt % 1000 <= 1) {
logger.warn("event dropped, event time {}, min event accept time {}", event.getTimestamp(),
minAcceptEventTime);
}
return true;
}
if (event.isFiltered()) {
return true;
} else {
return false;
}
}
/**
* Called by another thread.
*/
public void stop(long timeoutInMs) {
this.stopped = true;
waitConsumerStop(timeoutInMs);
}
private void waitConsumerStop(long timeoutInMs) {
try {
boolean stoppedSucceed = stopLatch.await(timeoutInMs, TimeUnit.MILLISECONDS);
if (stoppedSucceed) {
return;
}
if (!hasStoppedConsuming) {
logger.warn(
"Consumer not stopped normally, close it forcedly, but the thread may not be stopped correctly");
connector.wakeup();
}
stoppedSucceed = stopLatch.await(timeoutInMs, TimeUnit.MILLISECONDS);
if (stoppedSucceed) {
return;
}
if (hasStoppedConsuming) {
logger.warn("Consumer has been stopped, but cube data store is not closed");
} else {
logger.warn("Consumer is still not stopped");
}
} catch (InterruptedException e) {
logger.warn("Interrupted!", e);
Thread.interrupted();
} catch (Exception e) {
logger.error("Exception throws when wait consumer stopped", e);
}
}
/**
* Called by another thread.
*/
public void pause(boolean wait) {
this.paused = true;
if (wait) {
this.pauseLatch = new CountDownLatch(1);
try {
pauseLatch.await(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Interrupted!", e);
Thread.interrupted();
}
}
}
/**
* Called by another thread.
*/
public void resumeToStopCondition(IStopConsumptionCondition newStopCondition) {
this.paused = false;
if (newStopCondition != IStopConsumptionCondition.NEVER_STOP) {
setStopCondition(newStopCondition);
try {
boolean stoppedSucceed = stopLatch.await(120 * 1000L, TimeUnit.MILLISECONDS);
if (!stoppedSucceed) {
throw new StreamingException("consumer stop failed for stopCondition:" + newStopCondition);
}
} catch (InterruptedException e) {
logger.warn("Interrupted!", e);
}
}
}
/**
* Called by another thread.
*/
public void resume() {
this.paused = false;
}
public boolean isStopped() {
return stopped;
}
public boolean isPaused() {
return paused;
}
public String getSourceConsumeInfo() {
return cubeSegmentManager.getConsumePositionStr();
}
public ConsumerStats getConsumerStats() {
Map<Integer, Long> consumeLagMap = connector.getSource().calConsumeLag(cubeName, cubeSegmentManager.getConsumePosition());
long totalLag = 0L;
Map<Integer, PartitionConsumeStats> partitionConsumeStatsMap = Maps.newHashMap();
for (Map.Entry<Integer, Meter> meterEntry : eventConsumeMeters.entrySet()) {
Meter meter = meterEntry.getValue();
PartitionConsumeStats stats = new PartitionConsumeStats();
stats.setAvgRate(meter.getMeanRate());
stats.setOneMinRate(meter.getOneMinuteRate());
stats.setFiveMinRate(meter.getFiveMinuteRate());
stats.setFifteenMinRate(meter.getFifteenMinuteRate());
stats.setTotalConsume(meter.getCount());
long thisLag = consumeLagMap.getOrDefault(meterEntry.getKey(), 0L);
totalLag += thisLag;
stats.setConsumeLag(thisLag);
partitionConsumeStatsMap.put(meterEntry.getKey(), stats);
}
List<Partition> allPartitions = getConsumePartitions();
for (Partition partition : allPartitions) {
if (!partitionConsumeStatsMap.containsKey(partition.getPartitionId())) {
partitionConsumeStatsMap.put(partition.getPartitionId(), new PartitionConsumeStats());
}
}
ConsumerStats stats = new ConsumerStats();
stats.setStopped(stopped);
stats.setPaused(paused);
stats.setTotalIncomingEvents(incomingEventCnt);
stats.setTotalExceptionEvents(parseEventErrorCnt + addEventErrorCnt);
stats.setPartitionConsumeStatsMap(partitionConsumeStatsMap);
stats.setConsumeOffsetInfo(getSourceConsumeInfo());
stats.setConsumeLag(totalLag);
return stats;
}
public IStreamingConnector getConnector() {
return connector;
}
public List<Partition> getConsumePartitions() {
return getConnector().getConsumePartitions();
}
}