| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.yarn.server.timelineservice.storage; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.SubApplicationEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; |
| import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; |
| import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; |
| import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnRWHelper; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainColumn; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.domain.DomainTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTableRW; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; |
| import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTableRW; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This implements a hbase based backend for storing the timeline entity |
| * information. |
| * It writes to multiple tables at the backend |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class HBaseTimelineWriterImpl extends AbstractService implements |
| TimelineWriter { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(HBaseTimelineWriterImpl.class); |
| |
| private Connection conn; |
| private TimelineStorageMonitor storageMonitor; |
| private TypedBufferedMutator<EntityTable> entityTable; |
| private TypedBufferedMutator<AppToFlowTable> appToFlowTable; |
| private TypedBufferedMutator<ApplicationTable> applicationTable; |
| private TypedBufferedMutator<FlowActivityTable> flowActivityTable; |
| private TypedBufferedMutator<FlowRunTable> flowRunTable; |
| private TypedBufferedMutator<SubApplicationTable> subApplicationTable; |
| private TypedBufferedMutator<DomainTable> domainTable; |
| |
| /** |
| * Used to convert strings key components to and from storage format. |
| */ |
| private final KeyConverter<String> stringKeyConverter = |
| new StringKeyConverter(); |
| |
| /** |
| * Used to convert Long key components to and from storage format. |
| */ |
| private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); |
| |
| private enum Tables { |
| APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE |
| }; |
| |
| public HBaseTimelineWriterImpl() { |
| super(HBaseTimelineWriterImpl.class.getName()); |
| } |
| |
| /** |
| * initializes the hbase connection to write to the entity table. |
| */ |
| @Override |
| protected void serviceInit(Configuration conf) throws Exception { |
| super.serviceInit(conf); |
| Configuration hbaseConf = |
| HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); |
| conn = ConnectionFactory.createConnection(hbaseConf); |
| entityTable = new EntityTableRW().getTableMutator(hbaseConf, conn); |
| appToFlowTable = new AppToFlowTableRW().getTableMutator(hbaseConf, conn); |
| applicationTable = |
| new ApplicationTableRW().getTableMutator(hbaseConf, conn); |
| flowRunTable = new FlowRunTableRW().getTableMutator(hbaseConf, conn); |
| flowActivityTable = |
| new FlowActivityTableRW().getTableMutator(hbaseConf, conn); |
| subApplicationTable = |
| new SubApplicationTableRW().getTableMutator(hbaseConf, conn); |
| domainTable = new DomainTableRW().getTableMutator(hbaseConf, conn); |
| |
| UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ? |
| UserGroupInformation.getLoginUser() : |
| UserGroupInformation.getCurrentUser(); |
| storageMonitor = new HBaseStorageMonitor(conf); |
| LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi); |
| } |
| |
| @Override |
| protected void serviceStart() throws Exception { |
| super.serviceStart(); |
| storageMonitor.start(); |
| } |
| |
| /** |
| * Stores the entire information in TimelineEntities to the timeline store. |
| */ |
| @Override |
| public TimelineWriteResponse write(TimelineCollectorContext context, |
| TimelineEntities data, UserGroupInformation callerUgi) |
| throws IOException { |
| storageMonitor.checkStorageIsUp(); |
| TimelineWriteResponse putStatus = new TimelineWriteResponse(); |
| |
| String clusterId = context.getClusterId(); |
| String userId = context.getUserId(); |
| String flowName = context.getFlowName(); |
| String flowVersion = context.getFlowVersion(); |
| long flowRunId = context.getFlowRunId(); |
| String appId = context.getAppId(); |
| String subApplicationUser = callerUgi.getShortUserName(); |
| |
| // defensive coding to avoid NPE during row key construction |
| if ((flowName == null) || (appId == null) || (clusterId == null) |
| || (userId == null)) { |
| LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId |
| + " userId=" + userId + " clusterId=" + clusterId |
| + " . Not proceeding with writing to hbase"); |
| return putStatus; |
| } |
| |
| for (TimelineEntity te : data.getEntities()) { |
| |
| // a set can have at most 1 null |
| if (te == null) { |
| continue; |
| } |
| |
| // if the entity is the application, the destination is the application |
| // table |
| boolean isApplication = ApplicationEntity.isApplicationEntity(te); |
| byte[] rowKey; |
| if (isApplication) { |
| ApplicationRowKey applicationRowKey = |
| new ApplicationRowKey(clusterId, userId, flowName, flowRunId, |
| appId); |
| rowKey = applicationRowKey.getRowKey(); |
| store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE); |
| } else { |
| EntityRowKey entityRowKey = |
| new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, |
| te.getType(), te.getIdPrefix(), te.getId()); |
| rowKey = entityRowKey.getRowKey(); |
| store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); |
| } |
| |
| if (!isApplication && SubApplicationEntity.isSubApplicationEntity(te)) { |
| SubApplicationRowKey subApplicationRowKey = |
| new SubApplicationRowKey(subApplicationUser, clusterId, |
| te.getType(), te.getIdPrefix(), te.getId(), userId); |
| rowKey = subApplicationRowKey.getRowKey(); |
| store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE); |
| } |
| |
| if (isApplication) { |
| TimelineEvent event = |
| ApplicationEntity.getApplicationEvent(te, |
| ApplicationMetricsConstants.CREATED_EVENT_TYPE); |
| FlowRunRowKey flowRunRowKey = |
| new FlowRunRowKey(clusterId, userId, flowName, flowRunId); |
| if (event != null) { |
| onApplicationCreated(flowRunRowKey, clusterId, appId, userId, |
| flowVersion, te, event.getTimestamp()); |
| } |
| // if it's an application entity, store metrics |
| storeFlowMetricsAppRunning(flowRunRowKey, appId, te); |
| // if application has finished, store it's finish time and write final |
| // values of all metrics |
| event = ApplicationEntity.getApplicationEvent(te, |
| ApplicationMetricsConstants.FINISHED_EVENT_TYPE); |
| if (event != null) { |
| onApplicationFinished(flowRunRowKey, flowVersion, appId, te, |
| event.getTimestamp()); |
| } |
| } |
| } |
| return putStatus; |
| } |
| |
| @Override |
| public TimelineWriteResponse write(TimelineCollectorContext context, |
| TimelineDomain domain) |
| throws IOException { |
| storageMonitor.checkStorageIsUp(); |
| TimelineWriteResponse putStatus = new TimelineWriteResponse(); |
| |
| String clusterId = context.getClusterId(); |
| String domainId = domain.getId(); |
| |
| // defensive coding to avoid NPE during row key construction |
| if (clusterId == null) { |
| LOG.warn( |
| "Found null for clusterId. Not proceeding with writing to hbase"); |
| return putStatus; |
| } |
| |
| DomainRowKey domainRowKey = new DomainRowKey(clusterId, domainId); |
| byte[] rowKey = domainRowKey.getRowKey(); |
| |
| ColumnRWHelper.store(rowKey, domainTable, DomainColumn.CREATED_TIME, null, |
| domain.getCreatedTime()); |
| ColumnRWHelper.store(rowKey, domainTable, DomainColumn.DESCRIPTION, null, |
| domain.getDescription()); |
| ColumnRWHelper |
| .store(rowKey, domainTable, DomainColumn.MODIFICATION_TIME, null, |
| domain.getModifiedTime()); |
| ColumnRWHelper.store(rowKey, domainTable, DomainColumn.OWNER, null, |
| domain.getOwner()); |
| ColumnRWHelper.store(rowKey, domainTable, DomainColumn.READERS, null, |
| domain.getReaders()); |
| ColumnRWHelper.store(rowKey, domainTable, DomainColumn.WRITERS, null, |
| domain.getWriters()); |
| return putStatus; |
| } |
| |
| private void onApplicationCreated(FlowRunRowKey flowRunRowKey, |
| String clusterId, String appId, String userId, String flowVersion, |
| TimelineEntity te, long appCreatedTimeStamp) |
| throws IOException { |
| |
| String flowName = flowRunRowKey.getFlowName(); |
| Long flowRunId = flowRunRowKey.getFlowRunId(); |
| |
| // store in App to flow table |
| AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); |
| byte[] rowKey = appToFlowRowKey.getRowKey(); |
| ColumnRWHelper.store(rowKey, appToFlowTable, |
| AppToFlowColumnPrefix.FLOW_NAME, clusterId, null, flowName); |
| ColumnRWHelper.store(rowKey, appToFlowTable, |
| AppToFlowColumnPrefix.FLOW_RUN_ID, clusterId, null, flowRunId); |
| ColumnRWHelper.store(rowKey, appToFlowTable, AppToFlowColumnPrefix.USER_ID, |
| clusterId, null, userId); |
| |
| // store in flow run table |
| storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); |
| |
| // store in flow activity table |
| byte[] flowActivityRowKeyBytes = |
| new FlowActivityRowKey(flowRunRowKey.getClusterId(), |
| appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) |
| .getRowKey(); |
| byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); |
| ColumnRWHelper.store(flowActivityRowKeyBytes, flowActivityTable, |
| FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion, |
| AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); |
| } |
| |
| /* |
| * updates the {@link FlowRunTable} with Application Created information |
| */ |
| private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, |
| String appId, TimelineEntity te) throws IOException { |
| byte[] rowKey = flowRunRowKey.getRowKey(); |
| ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MIN_START_TIME, |
| null, te.getCreatedTime(), |
| AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); |
| } |
| |
| |
| /* |
| * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an |
| * application has finished |
| */ |
| private void onApplicationFinished(FlowRunRowKey flowRunRowKey, |
| String flowVersion, String appId, TimelineEntity te, |
| long appFinishedTimeStamp) throws IOException { |
| // store in flow run table |
| storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, |
| appFinishedTimeStamp); |
| |
| // indicate in the flow activity table that the app has finished |
| byte[] rowKey = |
| new FlowActivityRowKey(flowRunRowKey.getClusterId(), |
| appFinishedTimeStamp, flowRunRowKey.getUserId(), |
| flowRunRowKey.getFlowName()).getRowKey(); |
| byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); |
| ColumnRWHelper.store(rowKey, flowActivityTable, |
| FlowActivityColumnPrefix.RUN_ID, qualifier, null, flowVersion, |
| AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); |
| } |
| |
| /* |
| * Update the {@link FlowRunTable} with Application Finished information |
| */ |
| private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, |
| String appId, TimelineEntity te, long appFinishedTimeStamp) |
| throws IOException { |
| byte[] rowKey = flowRunRowKey.getRowKey(); |
| Attribute attributeAppId = |
| AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); |
| ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumn.MAX_END_TIME, |
| null, appFinishedTimeStamp, attributeAppId); |
| |
| // store the final value of metrics since application has finished |
| Set<TimelineMetric> metrics = te.getMetrics(); |
| if (metrics != null) { |
| storeFlowMetrics(rowKey, metrics, attributeAppId, |
| AggregationOperation.SUM_FINAL.getAttribute()); |
| } |
| } |
| |
| /* |
| * Updates the {@link FlowRunTable} with Application Metrics |
| */ |
| private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, |
| String appId, TimelineEntity te) throws IOException { |
| Set<TimelineMetric> metrics = te.getMetrics(); |
| if (metrics != null) { |
| byte[] rowKey = flowRunRowKey.getRowKey(); |
| storeFlowMetrics(rowKey, metrics, |
| AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), |
| AggregationOperation.SUM.getAttribute()); |
| } |
| } |
| |
| private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, |
| Attribute... attributes) throws IOException { |
| for (TimelineMetric metric : metrics) { |
| byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); |
| Map<Long, Number> timeseries = metric.getValues(); |
| for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { |
| Long timestamp = timeseriesEntry.getKey(); |
| ColumnRWHelper.store(rowKey, flowRunTable, FlowRunColumnPrefix.METRIC, |
| metricColumnQualifier, timestamp, timeseriesEntry.getValue(), |
| attributes); |
| } |
| } |
| } |
| |
| /** |
| * Stores the Relations from the {@linkplain TimelineEntity} object. |
| */ |
| private <T extends BaseTable<T>> void storeRelations(byte[] rowKey, |
| Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix, |
| TypedBufferedMutator<T> table) throws IOException { |
| if (connectedEntities != null) { |
| for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities |
| .entrySet()) { |
| // id3?id4?id5 |
| String compoundValue = |
| Separator.VALUES.joinEncoded(connectedEntity.getValue()); |
| ColumnRWHelper.store(rowKey, table, columnPrefix, |
| stringKeyConverter.encode(connectedEntity.getKey()), |
| null, compoundValue); |
| } |
| } |
| } |
| |
| /** |
| * Stores information from the {@linkplain TimelineEntity} object. |
| */ |
| private void store(byte[] rowKey, TimelineEntity te, |
| String flowVersion, |
| Tables table) throws IOException { |
| switch (table) { |
| case APPLICATION_TABLE: |
| ColumnRWHelper.store(rowKey, applicationTable, |
| ApplicationColumn.ID, null, te.getId()); |
| ColumnRWHelper.store(rowKey, applicationTable, |
| ApplicationColumn.CREATED_TIME, null, te.getCreatedTime()); |
| ColumnRWHelper.store(rowKey, applicationTable, |
| ApplicationColumn.FLOW_VERSION, null, flowVersion); |
| storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO, |
| applicationTable); |
| storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC, |
| applicationTable); |
| storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT, |
| applicationTable); |
| storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG, |
| applicationTable); |
| storeRelations(rowKey, te.getIsRelatedToEntities(), |
| ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); |
| storeRelations(rowKey, te.getRelatesToEntities(), |
| ApplicationColumnPrefix.RELATES_TO, applicationTable); |
| break; |
| case ENTITY_TABLE: |
| ColumnRWHelper.store(rowKey, entityTable, |
| EntityColumn.ID, null, te.getId()); |
| ColumnRWHelper.store(rowKey, entityTable, |
| EntityColumn.TYPE, null, te.getType()); |
| ColumnRWHelper.store(rowKey, entityTable, |
| EntityColumn.CREATED_TIME, null, te.getCreatedTime()); |
| ColumnRWHelper.store(rowKey, entityTable, |
| EntityColumn.FLOW_VERSION, null, flowVersion); |
| storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO, |
| entityTable); |
| storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC, |
| entityTable); |
| storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT, |
| entityTable); |
| storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG, |
| entityTable); |
| storeRelations(rowKey, te.getIsRelatedToEntities(), |
| EntityColumnPrefix.IS_RELATED_TO, entityTable); |
| storeRelations(rowKey, te.getRelatesToEntities(), |
| EntityColumnPrefix.RELATES_TO, entityTable); |
| break; |
| case SUBAPPLICATION_TABLE: |
| ColumnRWHelper.store(rowKey, subApplicationTable, SubApplicationColumn.ID, |
| null, te.getId()); |
| ColumnRWHelper.store(rowKey, subApplicationTable, |
| SubApplicationColumn.TYPE, null, te.getType()); |
| ColumnRWHelper.store(rowKey, subApplicationTable, |
| SubApplicationColumn.CREATED_TIME, null, te.getCreatedTime()); |
| ColumnRWHelper.store(rowKey, subApplicationTable, |
| SubApplicationColumn.FLOW_VERSION, null, flowVersion); |
| storeInfo(rowKey, te.getInfo(), flowVersion, |
| SubApplicationColumnPrefix.INFO, subApplicationTable); |
| storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC, |
| subApplicationTable); |
| storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT, |
| subApplicationTable); |
| storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG, |
| subApplicationTable); |
| storeRelations(rowKey, te.getIsRelatedToEntities(), |
| SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); |
| storeRelations(rowKey, te.getRelatesToEntities(), |
| SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); |
| break; |
| default: |
| LOG.info("Invalid table name provided."); |
| break; |
| } |
| } |
| |
| /** |
| * stores the info information from {@linkplain TimelineEntity}. |
| */ |
| private <T extends BaseTable<T>> void storeInfo(byte[] rowKey, |
| Map<String, Object> info, String flowVersion, |
| ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T > table) |
| throws IOException { |
| if (info != null) { |
| for (Map.Entry<String, Object> entry : info.entrySet()) { |
| ColumnRWHelper.store(rowKey, table, columnPrefix, |
| stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * stores the config information from {@linkplain TimelineEntity}. |
| */ |
| private <T extends BaseTable<T>> void storeConfig( |
| byte[] rowKey, Map<String, String> config, |
| ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) |
| throws IOException { |
| if (config != null) { |
| for (Map.Entry<String, String> entry : config.entrySet()) { |
| byte[] configKey = stringKeyConverter.encode(entry.getKey()); |
| ColumnRWHelper.store(rowKey, table, columnPrefix, configKey, |
| null, entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * stores the {@linkplain TimelineMetric} information from the |
| * {@linkplain TimelineEvent} object. |
| */ |
| private <T extends BaseTable<T>> void storeMetrics( |
| byte[] rowKey, Set<TimelineMetric> metrics, |
| ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) |
| throws IOException { |
| if (metrics != null) { |
| for (TimelineMetric metric : metrics) { |
| byte[] metricColumnQualifier = |
| stringKeyConverter.encode(metric.getId()); |
| Map<Long, Number> timeseries = metric.getValues(); |
| for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { |
| Long timestamp = timeseriesEntry.getKey(); |
| ColumnRWHelper.store(rowKey, table, columnPrefix, |
| metricColumnQualifier, timestamp, timeseriesEntry.getValue()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Stores the events from the {@linkplain TimelineEvent} object. |
| */ |
| private <T extends BaseTable<T>> void storeEvents( |
| byte[] rowKey, Set<TimelineEvent> events, |
| ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) |
| throws IOException { |
| if (events != null) { |
| for (TimelineEvent event : events) { |
| if (event != null) { |
| String eventId = event.getId(); |
| if (eventId != null) { |
| long eventTimestamp = event.getTimestamp(); |
| // if the timestamp is not set, use the current timestamp |
| if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { |
| LOG.warn("timestamp is not set for event " + eventId + |
| "! Using the current timestamp"); |
| eventTimestamp = System.currentTimeMillis(); |
| } |
| Map<String, Object> eventInfo = event.getInfo(); |
| if ((eventInfo == null) || (eventInfo.size() == 0)) { |
| byte[] columnQualifierBytes = |
| new EventColumnName(eventId, eventTimestamp, null) |
| .getColumnQualifier(); |
| ColumnRWHelper.store(rowKey, table, columnPrefix, |
| columnQualifierBytes, null, Separator.EMPTY_BYTES); |
| } else { |
| for (Map.Entry<String, Object> info : eventInfo.entrySet()) { |
| // eventId=infoKey |
| byte[] columnQualifierBytes = |
| new EventColumnName(eventId, eventTimestamp, info.getKey()) |
| .getColumnQualifier(); |
| ColumnRWHelper.store(rowKey, table, columnPrefix, |
| columnQualifierBytes, null, info.getValue()); |
| } // for info: eventInfo |
| } |
| } |
| } |
| } // event : events |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.apache.hadoop.yarn.server.timelineservice.storage |
| * .TimelineWriter#aggregate |
| * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, |
| * org.apache |
| * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) |
| */ |
| @Override |
| public TimelineWriteResponse aggregate(TimelineEntity data, |
| TimelineAggregationTrack track) throws IOException { |
| storageMonitor.checkStorageIsUp(); |
| return null; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush |
| * () |
| */ |
| @Override |
| public void flush() throws IOException { |
| storageMonitor.checkStorageIsUp(); |
| // flush all buffered mutators |
| entityTable.flush(); |
| appToFlowTable.flush(); |
| applicationTable.flush(); |
| flowRunTable.flush(); |
| flowActivityTable.flush(); |
| subApplicationTable.flush(); |
| domainTable.flush(); |
| } |
| |
| /** |
| * close the hbase connections The close APIs perform flushing and release any |
| * resources held. |
| */ |
| @Override |
| protected void serviceStop() throws Exception { |
| boolean isStorageUp = true; |
| try { |
| storageMonitor.checkStorageIsUp(); |
| } catch (IOException e) { |
| LOG.warn("Failed to close the timeline tables as Hbase is down", e); |
| isStorageUp = false; |
| } |
| |
| if (isStorageUp) { |
| if (entityTable != null) { |
| LOG.info("closing the entity table"); |
| // The close API performs flushing and releases any resources held |
| entityTable.close(); |
| } |
| if (appToFlowTable != null) { |
| LOG.info("closing the app_flow table"); |
| // The close API performs flushing and releases any resources held |
| appToFlowTable.close(); |
| } |
| if (applicationTable != null) { |
| LOG.info("closing the application table"); |
| applicationTable.close(); |
| } |
| if (flowRunTable != null) { |
| LOG.info("closing the flow run table"); |
| // The close API performs flushing and releases any resources held |
| flowRunTable.close(); |
| } |
| if (flowActivityTable != null) { |
| LOG.info("closing the flowActivityTable table"); |
| // The close API performs flushing and releases any resources held |
| flowActivityTable.close(); |
| } |
| if (subApplicationTable != null) { |
| subApplicationTable.close(); |
| } |
| if (domainTable != null) { |
| domainTable.close(); |
| } |
| if (conn != null) { |
| LOG.info("closing the hbase Connection"); |
| conn.close(); |
| } |
| } |
| storageMonitor.stop(); |
| super.serviceStop(); |
| } |
| |
| protected TimelineStorageMonitor getTimelineStorageMonitor() { |
| return storageMonitor; |
| } |
| |
| } |