blob: 15bdd209dc03696b65de7490443c7c0645e146ad [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.eagle.log.entity;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.eagle.common.config.EagleConfigFactory;
import org.apache.eagle.log.entity.meta.EntityDefinition;
import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.eagle.common.DateTimeUtil;
* multi-threading stream readers which only applies to time-series entity where we split the query into
* different time range When this class is used together with list query or aggregate query, be aware that the
* query's behavior could be changed for example pageSize does not work well, output sequence is not
* determined
public class GenericEntityStreamReaderMT extends StreamReader {
private static final Logger LOG = LoggerFactory.getLogger(GenericEntityStreamReaderMT.class);
private List<GenericEntityStreamReader> readers = new ArrayList<GenericEntityStreamReader>();
public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition, int numThreads)
throws Exception {
long queryStartTime = condition.getStartTime();
long queryEndTime = condition.getEndTime();
long subStartTime = queryStartTime;
long subEndTime = 0;
long interval = (queryEndTime - queryStartTime) / numThreads;
for (int i = 0; i < numThreads; i++) {
// split search condition by time range
subStartTime = queryStartTime + i * interval;
if (i == numThreads - 1) {
subEndTime = queryEndTime;
} else {
subEndTime = subStartTime + interval;
// String strStartTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subStartTime);
// String strEndTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(subEndTime);
SearchCondition sc = new SearchCondition(condition);
GenericEntityStreamReader reader = new GenericEntityStreamReader(serviceName, sc);
private void checkIsTimeSeries(String serviceName) throws Exception {
EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
if (!ed.isTimeSeries()) {
throw new IllegalArgumentException("Multi-threading stream reader must be applied to time series table");
private void checkNumThreads(int numThreads) {
if (numThreads <= 0) {
throw new IllegalArgumentException("Multi-threading stream reader must have numThreads >= 1");
* default to 2 threads
* @param serviceName
* @param condition
public GenericEntityStreamReaderMT(String serviceName, SearchCondition condition) throws Exception {
this(serviceName, condition, 2);
public void readAsStream() throws Exception {
// populate listeners to all readers
for (EntityCreationListener l : listeners) {
for (GenericEntityStreamReader r : readers) {
List<Future<Void>> futures = new ArrayList<Future<Void>>();
for (GenericEntityStreamReader r : readers) {
SingleReader reader = new SingleReader(r);
Future<Void> readFuture = EagleConfigFactory.load().getExecutor().submit(reader);
// join threads and check exceptions
for (Future<Void> future : futures) {
try {
} catch (Exception ex) {
LOG.error("Error in read", ex);
throw ex;
private static class SingleReader implements Callable<Void> {
private GenericEntityStreamReader reader;
public SingleReader(GenericEntityStreamReader reader) {
this.reader = reader;
public Void call() throws Exception {
return null;
public long getLastTimestamp() {
long lastTimestamp = 0;
for (GenericEntityStreamReader reader : readers) {
if (lastTimestamp < reader.getLastTimestamp()) {
lastTimestamp = reader.getLastTimestamp();
return lastTimestamp;
public long getFirstTimestamp() {
long firstTimestamp = 0;
for (GenericEntityStreamReader reader : readers) {
if (firstTimestamp > reader.getLastTimestamp() || firstTimestamp == 0) {
firstTimestamp = reader.getLastTimestamp();
return firstTimestamp;