| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.applicationhistoryservice; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.service.AbstractService; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptFinishData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptStartData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationFinishData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationStartData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerFinishData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData; |
| import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerStartData; |
| |
| /** |
| * In-memory implementation of {@link ApplicationHistoryStore}. This |
| * implementation is for test purpose only. If users improperly instantiate it, |
| * they may encounter reading and writing history data in different memory |
| * store. |
| * |
| */ |
| @Private |
| @Unstable |
| public class MemoryApplicationHistoryStore extends AbstractService implements |
| ApplicationHistoryStore { |
| |
| private final ConcurrentMap<ApplicationId, ApplicationHistoryData> applicationData = |
| new ConcurrentHashMap<ApplicationId, ApplicationHistoryData>(); |
| private final ConcurrentMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>> applicationAttemptData = |
| new ConcurrentHashMap<ApplicationId, ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData>>(); |
| private final ConcurrentMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>> containerData = |
| new ConcurrentHashMap<ApplicationAttemptId, ConcurrentMap<ContainerId, ContainerHistoryData>>(); |
| |
| public MemoryApplicationHistoryStore() { |
| super(MemoryApplicationHistoryStore.class.getName()); |
| } |
| |
| @Override |
| public Map<ApplicationId, ApplicationHistoryData> getAllApplications() { |
| return new HashMap<ApplicationId, ApplicationHistoryData>(applicationData); |
| } |
| |
| @Override |
| public ApplicationHistoryData getApplication(ApplicationId appId) { |
| return applicationData.get(appId); |
| } |
| |
| @Override |
| public Map<ApplicationAttemptId, ApplicationAttemptHistoryData> |
| getApplicationAttempts(ApplicationId appId) { |
| ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = |
| applicationAttemptData.get(appId); |
| if (subMap == null) { |
| return Collections |
| .<ApplicationAttemptId, ApplicationAttemptHistoryData> emptyMap(); |
| } else { |
| return new HashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>( |
| subMap); |
| } |
| } |
| |
| @Override |
| public ApplicationAttemptHistoryData getApplicationAttempt( |
| ApplicationAttemptId appAttemptId) { |
| ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = |
| applicationAttemptData.get(appAttemptId.getApplicationId()); |
| if (subMap == null) { |
| return null; |
| } else { |
| return subMap.get(appAttemptId); |
| } |
| } |
| |
| @Override |
| public ContainerHistoryData getAMContainer(ApplicationAttemptId appAttemptId) { |
| ApplicationAttemptHistoryData appAttempt = |
| getApplicationAttempt(appAttemptId); |
| if (appAttempt == null || appAttempt.getMasterContainerId() == null) { |
| return null; |
| } else { |
| return getContainer(appAttempt.getMasterContainerId()); |
| } |
| } |
| |
| @Override |
| public ContainerHistoryData getContainer(ContainerId containerId) { |
| Map<ContainerId, ContainerHistoryData> subMap = |
| containerData.get(containerId.getApplicationAttemptId()); |
| if (subMap == null) { |
| return null; |
| } else { |
| return subMap.get(containerId); |
| } |
| } |
| |
| @Override |
| public Map<ContainerId, ContainerHistoryData> getContainers( |
| ApplicationAttemptId appAttemptId) throws IOException { |
| ConcurrentMap<ContainerId, ContainerHistoryData> subMap = |
| containerData.get(appAttemptId); |
| if (subMap == null) { |
| return Collections.<ContainerId, ContainerHistoryData> emptyMap(); |
| } else { |
| return new HashMap<ContainerId, ContainerHistoryData>(subMap); |
| } |
| } |
| |
| @Override |
| public void applicationStarted(ApplicationStartData appStart) |
| throws IOException { |
| ApplicationHistoryData oldData = |
| applicationData.putIfAbsent(appStart.getApplicationId(), |
| ApplicationHistoryData.newInstance(appStart.getApplicationId(), |
| appStart.getApplicationName(), appStart.getApplicationType(), |
| appStart.getQueue(), appStart.getUser(), appStart.getSubmitTime(), |
| appStart.getStartTime(), Long.MAX_VALUE, null, null, null)); |
| if (oldData != null) { |
| throw new IOException("The start information of application " |
| + appStart.getApplicationId() + " is already stored."); |
| } |
| } |
| |
| @Override |
| public void applicationFinished(ApplicationFinishData appFinish) |
| throws IOException { |
| ApplicationHistoryData data = |
| applicationData.get(appFinish.getApplicationId()); |
| if (data == null) { |
| throw new IOException("The finish information of application " |
| + appFinish.getApplicationId() + " is stored before the start" |
| + " information."); |
| } |
| // Make the assumption that YarnApplicationState should not be null if |
| // the finish information is already recorded |
| if (data.getYarnApplicationState() != null) { |
| throw new IOException("The finish information of application " |
| + appFinish.getApplicationId() + " is already stored."); |
| } |
| data.setFinishTime(appFinish.getFinishTime()); |
| data.setDiagnosticsInfo(appFinish.getDiagnosticsInfo()); |
| data.setFinalApplicationStatus(appFinish.getFinalApplicationStatus()); |
| data.setYarnApplicationState(appFinish.getYarnApplicationState()); |
| } |
| |
| @Override |
| public void applicationAttemptStarted( |
| ApplicationAttemptStartData appAttemptStart) throws IOException { |
| ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = |
| getSubMap(appAttemptStart.getApplicationAttemptId().getApplicationId()); |
| ApplicationAttemptHistoryData oldData = |
| subMap.putIfAbsent(appAttemptStart.getApplicationAttemptId(), |
| ApplicationAttemptHistoryData.newInstance( |
| appAttemptStart.getApplicationAttemptId(), |
| appAttemptStart.getHost(), appAttemptStart.getRPCPort(), |
| appAttemptStart.getMasterContainerId(), null, null, null, null)); |
| if (oldData != null) { |
| throw new IOException("The start information of application attempt " |
| + appAttemptStart.getApplicationAttemptId() + " is already stored."); |
| } |
| } |
| |
| @Override |
| public void applicationAttemptFinished( |
| ApplicationAttemptFinishData appAttemptFinish) throws IOException { |
| ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> subMap = |
| getSubMap(appAttemptFinish.getApplicationAttemptId().getApplicationId()); |
| ApplicationAttemptHistoryData data = |
| subMap.get(appAttemptFinish.getApplicationAttemptId()); |
| if (data == null) { |
| throw new IOException("The finish information of application attempt " |
| + appAttemptFinish.getApplicationAttemptId() + " is stored before" |
| + " the start information."); |
| } |
| // Make the assumption that YarnApplicationAttemptState should not be null |
| // if the finish information is already recorded |
| if (data.getYarnApplicationAttemptState() != null) { |
| throw new IOException("The finish information of application attempt " |
| + appAttemptFinish.getApplicationAttemptId() + " is already stored."); |
| } |
| data.setTrackingURL(appAttemptFinish.getTrackingURL()); |
| data.setDiagnosticsInfo(appAttemptFinish.getDiagnosticsInfo()); |
| data |
| .setFinalApplicationStatus(appAttemptFinish.getFinalApplicationStatus()); |
| data.setYarnApplicationAttemptState(appAttemptFinish |
| .getYarnApplicationAttemptState()); |
| } |
| |
| private ConcurrentMap<ApplicationAttemptId, ApplicationAttemptHistoryData> |
| getSubMap(ApplicationId appId) { |
| applicationAttemptData |
| .putIfAbsent( |
| appId, |
| new ConcurrentHashMap<ApplicationAttemptId, ApplicationAttemptHistoryData>()); |
| return applicationAttemptData.get(appId); |
| } |
| |
| @Override |
| public void containerStarted(ContainerStartData containerStart) |
| throws IOException { |
| ConcurrentMap<ContainerId, ContainerHistoryData> subMap = |
| getSubMap(containerStart.getContainerId().getApplicationAttemptId()); |
| ContainerHistoryData oldData = |
| subMap.putIfAbsent(containerStart.getContainerId(), |
| ContainerHistoryData.newInstance(containerStart.getContainerId(), |
| containerStart.getAllocatedResource(), |
| containerStart.getAssignedNode(), containerStart.getPriority(), |
| containerStart.getStartTime(), Long.MAX_VALUE, null, |
| Integer.MAX_VALUE, null)); |
| if (oldData != null) { |
| throw new IOException("The start information of container " |
| + containerStart.getContainerId() + " is already stored."); |
| } |
| } |
| |
| @Override |
| public void containerFinished(ContainerFinishData containerFinish) |
| throws IOException { |
| ConcurrentMap<ContainerId, ContainerHistoryData> subMap = |
| getSubMap(containerFinish.getContainerId().getApplicationAttemptId()); |
| ContainerHistoryData data = subMap.get(containerFinish.getContainerId()); |
| if (data == null) { |
| throw new IOException("The finish information of container " |
| + containerFinish.getContainerId() + " is stored before" |
| + " the start information."); |
| } |
| // Make the assumption that ContainerState should not be null if |
| // the finish information is already recorded |
| if (data.getContainerState() != null) { |
| throw new IOException("The finish information of container " |
| + containerFinish.getContainerId() + " is already stored."); |
| } |
| data.setFinishTime(containerFinish.getFinishTime()); |
| data.setDiagnosticsInfo(containerFinish.getDiagnosticsInfo()); |
| data.setContainerExitStatus(containerFinish.getContainerExitStatus()); |
| data.setContainerState(containerFinish.getContainerState()); |
| } |
| |
| private ConcurrentMap<ContainerId, ContainerHistoryData> getSubMap( |
| ApplicationAttemptId appAttemptId) { |
| containerData.putIfAbsent(appAttemptId, |
| new ConcurrentHashMap<ContainerId, ContainerHistoryData>()); |
| return containerData.get(appAttemptId); |
| } |
| |
| } |