| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.eagle.log.entity; |
| |
| import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; |
| import org.apache.eagle.log.entity.meta.EntityConstants; |
| import org.apache.eagle.log.entity.meta.EntityDefinition; |
| import org.apache.eagle.log.entity.meta.EntityDefinitionManager; |
| import org.apache.eagle.common.DateTimeUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.Date; |
| |
| public class GenericEntityScanStreamReader extends StreamReader { |
| private static final Logger LOG = LoggerFactory.getLogger(GenericEntityScanStreamReader.class); |
| |
| private EntityDefinition entityDef; |
| private SearchCondition condition; |
| private String prefix; |
| private long lastTimestamp = 0; |
| private long firstTimestamp = 0; |
| |
| public GenericEntityScanStreamReader(String serviceName, SearchCondition condition, String prefix) |
| throws InstantiationException, IllegalAccessException { |
| this.prefix = prefix; |
| checkNotNull(serviceName, "serviceName"); |
| this.entityDef = EntityDefinitionManager.getEntityByServiceName(serviceName); |
| checkNotNull(entityDef, "EntityDefinition"); |
| this.condition = condition; |
| } |
| |
| public GenericEntityScanStreamReader(EntityDefinition entityDef, SearchCondition condition, String prefix) |
| throws InstantiationException, IllegalAccessException { |
| this.prefix = prefix; |
| checkNotNull(entityDef, "entityDef"); |
| this.entityDef = entityDef; |
| checkNotNull(entityDef, "EntityDefinition"); |
| this.condition = condition; |
| } |
| |
| @Override |
| public long getLastTimestamp() { |
| return lastTimestamp; |
| } |
| |
| private void checkNotNull(Object o, String message) { |
| if (o == null) { |
| throw new IllegalArgumentException(message + " should not be null"); |
| } |
| } |
| |
| public EntityDefinition getEntityDefinition() { |
| return entityDef; |
| } |
| |
| public SearchCondition getSearchCondition() { |
| return condition; |
| } |
| |
| @Override |
| public void readAsStream() throws Exception { |
| Date start = null; |
| Date end = null; |
| // shortcut to avoid read when pageSize=0 |
| if (condition.getPageSize() <= 0) { |
| return; // return nothing |
| } |
| // Process the time range if needed |
| if (entityDef.isTimeSeries()) { |
| start = new Date(condition.getStartTime()); |
| end = new Date(condition.getEndTime()); |
| } else { |
| // start = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_START_HUMANTIME); |
| // end = DateTimeUtil.humanDateToDate(EntityConstants.FIXED_READ_END_HUMANTIME); |
| start = new Date(EntityConstants.FIXED_READ_START_TIMESTAMP); |
| end = new Date(EntityConstants.FIXED_READ_END_TIMESTAMP); |
| } |
| byte[][] outputQualifiers = null; |
| if (!condition.isOutputAll()) { |
| // Generate the output qualifiers |
| outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, |
| condition.getOutputFields()); |
| } |
| HBaseLogReader2 reader = new HBaseLogReader2(entityDef, condition.getPartitionValues(), start, end, |
| condition.getFilter(), condition.getStartRowkey(), |
| outputQualifiers, this.prefix); |
| try { |
| reader.open(); |
| InternalLog log; |
| int count = 0; |
| while ((log = reader.read()) != null) { |
| TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); |
| if (lastTimestamp < entity.getTimestamp()) { |
| lastTimestamp = entity.getTimestamp(); |
| } |
| if (firstTimestamp > entity.getTimestamp() || firstTimestamp == 0) { |
| firstTimestamp = entity.getTimestamp(); |
| } |
| |
| entity.setSerializeVerbose(condition.isOutputVerbose()); |
| entity.setSerializeAlias(condition.getOutputAlias()); |
| |
| for (EntityCreationListener l : listeners) { |
| l.entityCreated(entity); |
| } |
| if (++count == condition.getPageSize()) { |
| break; |
| } |
| } |
| } catch (IOException ioe) { |
| LOG.error("Fail reading log", ioe); |
| throw ioe; |
| } finally { |
| reader.close(); |
| } |
| } |
| |
| @Override |
| public long getFirstTimestamp() { |
| return this.firstTimestamp; |
| } |
| } |