blob: 34362032d1ac9a3fa8ba021b4d913aefdbf43a88 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.state.scheduler;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.ambari.server.controller.RequestScheduleResponse;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.RequestScheduleBatchRequestDAO;
import org.apache.ambari.server.orm.dao.RequestScheduleDAO;
import org.apache.ambari.server.orm.entities.ClusterEntity;
import org.apache.ambari.server.orm.entities.RequestScheduleBatchRequestEntity;
import org.apache.ambari.server.orm.entities.RequestScheduleEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.utils.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.persist.Transactional;
public class RequestExecutionImpl implements RequestExecution {
private Cluster cluster;
private Batch batch;
private Schedule schedule;
private RequestScheduleEntity requestScheduleEntity;
private volatile boolean isPersisted = false;
@Inject
private Gson gson;
@Inject
private Clusters clusters;
@Inject
private RequestScheduleDAO requestScheduleDAO;
@Inject
private RequestScheduleBatchRequestDAO batchRequestDAO;
@Inject
private ClusterDAO clusterDAO;
@Inject
private HostDAO hostDAO;
private static final Logger LOG = LoggerFactory.getLogger(RequestExecutionImpl.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@AssistedInject
public RequestExecutionImpl(@Assisted("cluster") Cluster cluster,
@Assisted("batch") Batch batch,
@Assisted("schedule") @Nullable Schedule schedule,
Injector injector) {
this.cluster = cluster;
this.batch = batch;
this.schedule = schedule;
injector.injectMembers(this);
// Initialize the Entity object
// Batch Hosts is initialized on persist
requestScheduleEntity = new RequestScheduleEntity();
requestScheduleEntity.setClusterId(cluster.getClusterId());
updateBatchSettings();
updateSchedule();
}
@AssistedInject
public RequestExecutionImpl(@Assisted Cluster cluster,
@Assisted RequestScheduleEntity requestScheduleEntity,
Injector injector) {
this.cluster = cluster;
injector.injectMembers(this);
this.requestScheduleEntity = requestScheduleEntity;
batch = new Batch();
schedule = new Schedule();
BatchSettings batchSettings = new BatchSettings();
batchSettings.setBatchSeparationInSeconds(requestScheduleEntity.getBatchSeparationInSeconds());
batchSettings.setTaskFailureToleranceLimit(requestScheduleEntity.getBatchTolerationLimit());
batch.setBatchSettings(batchSettings);
Collection<RequestScheduleBatchRequestEntity> batchRequestEntities =
requestScheduleEntity.getRequestScheduleBatchRequestEntities();
if (batchRequestEntities != null) {
for (RequestScheduleBatchRequestEntity batchRequestEntity :
batchRequestEntities) {
BatchRequest batchRequest = new BatchRequest();
batchRequest.setOrderId(batchRequestEntity.getBatchId());
batchRequest.setType(BatchRequest.Type.valueOf(batchRequestEntity.getRequestType()));
batchRequest.setUri(batchRequestEntity.getRequestUri());
batchRequest.setStatus(batchRequestEntity.getRequestStatus());
batchRequest.setReturnCode(batchRequestEntity.getReturnCode());
batchRequest.setResponseMsg(batchRequestEntity.getReturnMessage());
batch.getBatchRequests().add(batchRequest);
}
}
schedule.setDayOfWeek(requestScheduleEntity.getDayOfWeek());
schedule.setDaysOfMonth(requestScheduleEntity.getDaysOfMonth());
schedule.setMinutes(requestScheduleEntity.getMinutes());
schedule.setHours(requestScheduleEntity.getHours());
schedule.setMonth(requestScheduleEntity.getMonth());
schedule.setYear(requestScheduleEntity.getYear());
schedule.setStartTime(requestScheduleEntity.getStartTime());
schedule.setEndTime(requestScheduleEntity.getEndTime());
isPersisted = true;
}
@Override
public Long getId() {
return requestScheduleEntity.getScheduleId();
}
@Override
public String getClusterName() {
return cluster.getClusterName();
}
@Override
public Batch getBatch() {
return batch;
}
@Override
public void setBatch(Batch batch) {
this.batch = batch;
}
@Override
public Schedule getSchedule() {
return schedule;
}
@Override
public void setSchedule(Schedule schedule) {
this.schedule = schedule;
}
@Override
public RequestScheduleResponse convertToResponse() {
readWriteLock.readLock().lock();
try{
RequestScheduleResponse response = new RequestScheduleResponse(
getId(), getClusterName(), getDescription(), getStatus(),
getLastExecutionStatus(), getBatch(), getSchedule(),
requestScheduleEntity.getCreateUser(),
DateUtils.convertToReadableTime(requestScheduleEntity.getCreateTimestamp()),
requestScheduleEntity.getUpdateUser(),
DateUtils.convertToReadableTime(requestScheduleEntity.getUpdateTimestamp()),
requestScheduleEntity.getAuthenticatedUserId()
);
return response;
} finally {
readWriteLock.readLock().unlock();
}
}
@Override
public void persist() {
readWriteLock.writeLock().lock();
try {
if (!isPersisted) {
persistEntities();
refresh();
cluster.refresh();
isPersisted = true;
} else {
saveIfPersisted();
}
} finally {
readWriteLock.writeLock().unlock();
}
}
@Override
public void refresh() {
readWriteLock.writeLock().lock();
try{
if (isPersisted) {
RequestScheduleEntity scheduleEntity = requestScheduleDAO.findById
(requestScheduleEntity.getScheduleId());
requestScheduleDAO.refresh(scheduleEntity);
}
} finally {
readWriteLock.writeLock().unlock();
}
}
@Override
public void delete() {
readWriteLock.writeLock().lock();
try {
if (isPersisted) {
batchRequestDAO.removeByScheduleId(requestScheduleEntity.getScheduleId());
requestScheduleDAO.remove(requestScheduleEntity);
cluster.refresh();
isPersisted = false;
}
} finally {
readWriteLock.writeLock().unlock();
}
}
@Override
public String getStatus() {
return requestScheduleEntity.getStatus();
}
@Override
public void setDescription(String description) {
requestScheduleEntity.setDescription(description);
}
@Override
public String getDescription() {
return requestScheduleEntity.getDescription();
}
/**
* Persist @RequestScheduleEntity with @RequestScheduleBatchHostEntity
*/
@Transactional
void persistEntities() {
ClusterEntity clusterEntity = clusterDAO.findById(cluster.getClusterId());
requestScheduleEntity.setClusterEntity(clusterEntity);
requestScheduleEntity.setCreateTimestamp(System.currentTimeMillis());
requestScheduleEntity.setUpdateTimestamp(System.currentTimeMillis());
requestScheduleDAO.create(requestScheduleEntity);
persistRequestMapping();
}
@Transactional
void persistRequestMapping() {
// Delete existing mappings to support updates
if (isPersisted) {
batchRequestDAO.removeByScheduleId(requestScheduleEntity.getScheduleId());
requestScheduleEntity.getRequestScheduleBatchRequestEntities().clear();
}
if (batch != null) {
List<BatchRequest> batchRequests = batch.getBatchRequests();
if (batchRequests != null) {
Collections.sort(batchRequests);
for (BatchRequest batchRequest : batchRequests) {
RequestScheduleBatchRequestEntity batchRequestEntity = new
RequestScheduleBatchRequestEntity();
batchRequestEntity.setBatchId(batchRequest.getOrderId());
batchRequestEntity.setScheduleId(requestScheduleEntity.getScheduleId());
batchRequestEntity.setRequestScheduleEntity(requestScheduleEntity);
batchRequestEntity.setRequestType(batchRequest.getType());
batchRequestEntity.setRequestUri(batchRequest.getUri());
batchRequestEntity.setRequestBody(batchRequest.getBody());
batchRequestEntity.setReturnCode(batchRequest.getReturnCode());
batchRequestEntity.setReturnMessage(batchRequest.getResponseMsg());
batchRequestEntity.setRequestStatus(batchRequest.getStatus());
batchRequestDAO.create(batchRequestEntity);
requestScheduleEntity.getRequestScheduleBatchRequestEntities().add
(batchRequestEntity);
requestScheduleDAO.merge(requestScheduleEntity);
}
}
}
}
@Transactional
void saveIfPersisted() {
if (isPersisted) {
requestScheduleEntity.setUpdateTimestamp(System.currentTimeMillis());
// Update the Entity object with new settings
updateBatchSettings();
updateSchedule();
// Persist schedule and settings
requestScheduleDAO.merge(requestScheduleEntity);
// Persist batches of hosts
persistRequestMapping();
}
}
private void updateBatchSettings() {
if (batch != null) {
BatchSettings settings = batch.getBatchSettings();
if (settings != null) {
requestScheduleEntity.setBatchSeparationInSeconds(settings.getBatchSeparationInSeconds());
requestScheduleEntity.setBatchTolerationLimit(settings.getTaskFailureToleranceLimit());
}
}
}
private void updateSchedule() {
if (schedule != null) {
requestScheduleEntity.setMinutes(schedule.getMinutes());
requestScheduleEntity.setHours(schedule.getHours());
requestScheduleEntity.setDaysOfMonth(schedule.getDaysOfMonth());
requestScheduleEntity.setDayOfWeek(schedule.getDayOfWeek());
requestScheduleEntity.setMonth(schedule.getMonth());
requestScheduleEntity.setYear(schedule.getYear());
requestScheduleEntity.setStartTime(schedule.getStartTime());
requestScheduleEntity.setEndTime(schedule.getEndTime());
}
}
@Override
public void setStatus(Status status) {
requestScheduleEntity.setStatus(status.name());
}
@Override
public void setLastExecutionStatus(String status) {
requestScheduleEntity.setLastExecutionStatus(status);
}
@Override
public void setAuthenticatedUserId(Integer username) {
requestScheduleEntity.setAuthenticatedUserId(username);
}
@Override
public void setCreateUser(String username) {
requestScheduleEntity.setCreateUser(username);
}
@Override
public void setUpdateUser(String username) {
requestScheduleEntity.setUpdateUser(username);
}
@Override
public String getCreateTime() {
return DateUtils.convertToReadableTime
(requestScheduleEntity.getCreateTimestamp());
}
@Override
public String getUpdateTime() {
return DateUtils.convertToReadableTime
(requestScheduleEntity.getUpdateTimestamp());
}
@Override
public Integer getAuthenticatedUserId() {
return requestScheduleEntity.getAuthenticatedUserId();
}
@Override
public String getCreateUser() {
return requestScheduleEntity.getCreateUser();
}
@Override
public String getUpdateUser() {
return requestScheduleEntity.getUpdateUser();
}
@Override
public String getLastExecutionStatus() {
return requestScheduleEntity.getLastExecutionStatus();
}
@Override
public RequestScheduleResponse convertToResponseWithBody() {
readWriteLock.readLock().lock();
try{
RequestScheduleResponse response = convertToResponse();
Batch batch = response.getBatch();
if (batch != null) {
List<BatchRequest> batchRequests = batch.getBatchRequests();
if (batchRequests != null) {
for (BatchRequest batchRequest : batchRequests) {
batchRequest.setBody(getRequestBody(batchRequest.getOrderId()));
}
}
}
return response;
} finally {
readWriteLock.readLock().unlock();
}
}
@Override
public String getRequestBody(Long batchId) {
String body = null;
if (requestScheduleEntity != null) {
Collection<RequestScheduleBatchRequestEntity> requestEntities =
requestScheduleEntity.getRequestScheduleBatchRequestEntities();
if (requestEntities != null) {
for (RequestScheduleBatchRequestEntity requestEntity : requestEntities) {
if (requestEntity.getBatchId().equals(batchId)) {
body = requestEntity.getRequestBodyAsString();
}
}
}
}
return body;
}
@Override
public BatchRequest getBatchRequest(long batchId) {
for (BatchRequest batchRequest : batch.getBatchRequests()) {
if (batchId == batchRequest.getOrderId()) {
return batchRequest;
}
}
return null;
}
@Override
public void updateBatchRequest(long batchId,
BatchRequestResponse batchRequestResponse,
boolean statusOnly) {
RequestScheduleBatchRequestEntity batchRequestEntity = null;
for (RequestScheduleBatchRequestEntity entity :
requestScheduleEntity.getRequestScheduleBatchRequestEntities()) {
if (entity.getBatchId() == batchId
&& entity.getScheduleId() == requestScheduleEntity.getScheduleId()) {
batchRequestEntity = entity;
}
}
if (batchRequestEntity != null) {
batchRequestEntity.setRequestStatus(batchRequestResponse.getStatus());
if (!statusOnly) {
batchRequestEntity.setReturnCode(batchRequestResponse.getReturnCode());
batchRequestEntity.setRequestId(batchRequestResponse.getRequestId());
batchRequestEntity.setReturnMessage(batchRequestResponse.getReturnMessage());
}
batchRequestDAO.merge(batchRequestEntity);
}
BatchRequest batchRequest = getBatchRequest(batchId);
batchRequest.setStatus(batchRequestResponse.getStatus());
if (!statusOnly) {
batchRequest.setReturnCode(batchRequestResponse.getReturnCode());
batchRequest.setResponseMsg(batchRequestResponse.getReturnMessage());
}
setLastExecutionStatus(batchRequestResponse.getStatus());
requestScheduleDAO.merge(requestScheduleEntity);
}
@Override
@Transactional
public void updateStatus(Status status) {
setStatus(status);
if (isPersisted) {
requestScheduleEntity.setUpdateTimestamp(System.currentTimeMillis());
requestScheduleDAO.merge(requestScheduleEntity);
} else {
LOG.warn("Updated status in memory, since Request Schedule is not " +
"persisted.");
}
}
}