blob: 266319f6a74d278e00082b11dc00ba9c01728eb2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.update;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.DatasourceHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.datasource.Datasource;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* Helper methods to facilitate entity updates.
*/
public final class UpdateHelper {
private static final Logger LOG = LoggerFactory.getLogger(UpdateHelper.class);
private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff",
"schema.location", "schema.provider", "tags",
"group", "owner", "permission", };
private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts",
"lateProcess.policy", "lateProcess.delay",
"lateProcess.lateInputs[\\d+].input",
"lateProcess.lateInputs[\\d+].workflowPath",
"owner", "group", "permission", "tags",
"pipelines", };
private UpdateHelper() {}
public static boolean isEntityUpdated(Entity oldEntity, Entity newEntity, String cluster,
Path oldStagingPath) throws FalconException {
Entity oldView = EntityUtil.getClusterView(oldEntity, cluster);
Entity newView = EntityUtil.getClusterView(newEntity, cluster);
//staging path contains md5 of the cluster view of entity
String[] parts = oldStagingPath.getName().split("_");
if (parts[0].equals(EntityUtil.md5(newView))) {
return false;
}
switch (oldEntity.getEntityType()) {
case FEED:
return !EntityUtil.equals(oldView, newView, FEED_FIELDS);
case PROCESS:
return !EntityUtil.equals(oldView, newView, PROCESS_FIELDS);
case CLUSTER:
return isClusterEntityUpdated((org.apache.falcon.entity.v0.cluster.Cluster) oldEntity,
(org.apache.falcon.entity.v0.cluster.Cluster) newEntity);
default:
}
throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
}
public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity, String cluster)
throws FalconException {
if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) {
Feed oldFeed = (Feed) oldEntity;
Feed newFeed = (Feed) newEntity;
Process affectedProcess = (Process) affectedEntity;
//check if affectedProcess is defined for this cluster
Cluster processCluster = ProcessHelper.getCluster(affectedProcess, cluster);
if (processCluster == null) {
LOG.debug("Process {} is not defined for cluster {}. Skipping", affectedProcess.getName(), cluster);
return false;
}
if (processCluster.getValidity().getEnd().before(new Date())) {
LOG.debug("Process {} validity {} is in the past. Skipping...", affectedProcess.getName(),
processCluster.getValidity().getEnd());
return false;
}
if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
LOG.debug("{}: Frequency has changed. Updating...", oldFeed.toShortString());
return true;
}
if (!StringUtils.equals(oldFeed.getAvailabilityFlag(), newFeed.getAvailabilityFlag())) {
LOG.debug("{}: Availability flag has changed. Updating...", oldFeed.toShortString());
return true;
}
org.apache.falcon.entity.v0.feed.Cluster oldFeedCluster = FeedHelper.getCluster(oldFeed, cluster);
org.apache.falcon.entity.v0.feed.Cluster newFeedCluster = FeedHelper.getCluster(newFeed, cluster);
if (!oldFeedCluster.getValidity().getStart().equals(newFeedCluster.getValidity().getStart())) {
LOG.debug("{}: Start time for cluster {} has changed. Updating...", oldFeed.toShortString(), cluster);
return true;
}
Storage oldFeedStorage = FeedHelper.createStorage(cluster, oldFeed);
Storage newFeedStorage = FeedHelper.createStorage(cluster, newFeed);
if (!oldFeedStorage.isIdentical(newFeedStorage)) {
LOG.debug("{}: Storage has changed. Updating...", oldFeed.toShortString());
return true;
}
return false;
} else {
LOG.debug(newEntity.toShortString());
LOG.debug(affectedEntity.toShortString());
throw new FalconException("Don't know what to do. Unexpected scenario");
}
}
public static boolean isClusterEntityUpdated(final org.apache.falcon.entity.v0.cluster.Cluster oldEntity,
final org.apache.falcon.entity.v0.cluster.Cluster newEntity) {
/*
* Name should not be updated.
* interface, locations, properties, colo : Update bundle/coord for dependent entities.
* Description, tags, ACL : no need to update bundle/coord for dependent entities.
*/
if (!oldEntity.getColo().equals(newEntity.getColo())) {
return true;
}
for(Interfacetype interfacetype : Interfacetype.values()) {
if (!ClusterHelper.matchInterface(oldEntity, newEntity, interfacetype)) {
return true;
}
}
for(ClusterLocationType locationType : ClusterLocationType.values()) {
if (!ClusterHelper.matchLocations(oldEntity, newEntity, locationType)) {
return true;
}
}
if (!ClusterHelper.matchProperties(oldEntity, newEntity)) {
return true;
}
return false;
}
public static boolean isDatasourceEntityUpdated(final Datasource oldEntity, final Datasource newEntity)
throws FalconException {
// ignore changes : colo, acl, description, tags
// can't change : name, data source entity type
// major change that trigger bundle rewrite
// driver class name change but not driver jar as it is automatically picked up from share lib
if (!DatasourceHelper.isSameDriverClazz(oldEntity.getDriver(), newEntity.getDriver())) {
return true;
}
// interface endpoint, credential or driver update will trigger a bundle rewrite
for(org.apache.falcon.entity.v0.datasource.Interfacetype ifacetype
: org.apache.falcon.entity.v0.datasource.Interfacetype.values()) {
if (!DatasourceHelper.isSameInterface(oldEntity, newEntity, ifacetype)) {
return true;
}
}
// check default credential too
if (!DatasourceHelper.isSameCredentials(oldEntity.getInterfaces().getCredential(),
newEntity.getInterfaces().getCredential())) {
return true;
}
// any change in the properties will trigger a bundle rewrite
if (!DatasourceHelper.isSameProperties(oldEntity, newEntity)) {
return true;
}
return false;
}
}