blob: e2d4bbb01d29700b07f7481227d0be0bc03f7bd8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.falcon.hook;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.falcon.bridge.FalconBridge;
import org.apache.atlas.falcon.model.FalconDataTypes;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hive.model.HiveDataTypes;
import org.apache.atlas.typesystem.Referenceable;
import org.apache.atlas.typesystem.persistence.Id;
import org.apache.atlas.typesystem.types.TypeUtils;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.atlas.utils.ParamChecker;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.atlas.falcon.service.AtlasService;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.FileSystemStorage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.security.CurrentUser;
import org.slf4j.Logger;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import java.util.List;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
public class FalconHookIT {
public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class);
public static final String CLUSTER_RESOURCE = "/cluster.xml";
public static final String FEED_RESOURCE = "/feed.xml";
public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml";
public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml";
public static final String PROCESS_RESOURCE = "/process.xml";
private AtlasClient atlasClient;
private static final ConfigurationStore STORE = ConfigurationStore.get();
@BeforeClass
public void setUp() throws Exception {
Configuration atlasProperties = ApplicationProperties.get();
if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
atlasClient = new AtlasClient(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"});
} else {
atlasClient = new AtlasClient(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT));
}
AtlasService service = new AtlasService();
service.init();
STORE.registerListener(service);
CurrentUser.authenticate(System.getProperty("user.name"));
}
private boolean isDataModelAlreadyRegistered() throws Exception {
try {
atlasClient.getType(FalconDataTypes.FALCON_PROCESS.getName());
LOG.info("Hive data model is already registered!");
return true;
} catch(AtlasServiceException ase) {
if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) {
return false;
}
throw ase;
}
}
private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException {
Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource));
switch (entity.getEntityType()) {
case CLUSTER:
((Cluster) entity).setName(name);
break;
case FEED:
((Feed) entity).setName(name);
break;
case PROCESS:
((org.apache.falcon.entity.v0.process.Process) entity).setName(name);
break;
}
return (T)entity;
}
private String random() {
return RandomStringUtils.randomAlphanumeric(10);
}
private String getTableUri(String dbName, String tableName) {
return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName);
}
@Test
public void testCreateProcess() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
assertClusterIsRegistered(cluster);
Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null);
String infeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName())).getId()._getId();
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outFeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName())).getId()._getId();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(process, cluster.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertNotNull(processEntity);
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertEquals(((List<Id>)processEntity.get("inputs")).get(0)._getId(), infeedId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0)._getId(), outFeedId);
}
private String assertProcessIsRegistered(Process process, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getProcessQualifiedName(process.getName(), clusterName));
}
private String assertClusterIsRegistered(Cluster cluster) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName());
}
private TypeUtils.Pair<String, Feed> getHDFSFeed(String feedResource, String clusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
feedCluster.setName(clusterName);
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
assertFeedAttributes(feedId);
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
Referenceable processEntity = atlasClient.getEntity(processId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0).getId()._getId(), feedId);
String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId();
Referenceable pathEntity = atlasClient.getEntity(inputId);
assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH);
List<Location> locations = FeedHelper.getLocations(feedCluster, feed);
Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA);
assertEquals(pathEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
FalconBridge.normalize(dataLocation.getPath()));
return TypeUtils.Pair.of(feedId, feed);
}
private Feed getTableFeed(String feedResource, String clusterName) throws Exception {
return getTableFeed(feedResource, clusterName, null);
}
private Feed getTableFeed(String feedResource, String clusterName, String secondClusterName) throws Exception {
Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random());
org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
feedCluster.setName(clusterName);
String dbName = "db" + random();
String tableName = "table" + random();
feedCluster.getTable().setUri(getTableUri(dbName, tableName));
String dbName2 = "db" + random();
String tableName2 = "table" + random();
if (secondClusterName != null) {
org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = feed.getClusters().getClusters().get(1);
feedCluster2.setName(secondClusterName);
feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2));
}
STORE.publish(EntityType.FEED, feed);
String feedId = assertFeedIsRegistered(feed, clusterName);
assertFeedAttributes(feedId);
verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName);
if (secondClusterName != null) {
String feedId2 = assertFeedIsRegistered(feed, secondClusterName);
assertFeedAttributes(feedId2);
verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2);
}
return feed;
}
private void assertFeedAttributes(String feedId) throws Exception {
Referenceable feedEntity = atlasClient.getEntity(feedId);
assertEquals(feedEntity.get(AtlasClient.OWNER), "testuser");
assertEquals(feedEntity.get(FalconBridge.FREQUENCY), "hours(1)");
assertEquals(feedEntity.get(AtlasClient.DESCRIPTION), "test input");
}
private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName)
throws Exception{
//verify that lineage from hive table to falcon feed is created
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feedName, clusterName));
Referenceable processEntity = atlasClient.getEntity(processId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0).getId()._getId(), feedId);
String inputId = ((List<Id>) processEntity.get("inputs")).get(0).getId()._getId();
Referenceable tableEntity = atlasClient.getEntity(inputId);
assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName());
assertEquals(tableEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName));
}
private String assertFeedIsRegistered(Feed feed, String clusterName) throws Exception {
return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), clusterName));
}
@Test
public void testReplicationFeed() throws Exception {
Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, srcCluster);
assertClusterIsRegistered(srcCluster);
Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, targetCluster);
assertClusterIsRegistered(targetCluster);
Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName());
String inId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName())).getId()._getId();
String outId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName())).getId()._getId();
String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(),
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName());
Referenceable process = atlasClient.getEntity(processId);
assertEquals(((List<Id>)process.get("inputs")).get(0)._getId(), inId);
assertEquals(((List<Id>)process.get("outputs")).get(0)._getId(), outId);
}
@Test
public void testCreateProcessWithHDFSFeed() throws Exception {
Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random());
STORE.publish(EntityType.CLUSTER, cluster);
TypeUtils.Pair<String, Feed> result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName());
Feed infeed = result.right;
String infeedId = result.left;
Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName());
String outfeedId = atlasClient.getEntity(FalconDataTypes.FALCON_FEED.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName())).getId()._getId();
Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random());
process.getClusters().getClusters().get(0).setName(cluster.getName());
process.getInputs().getInputs().get(0).setFeed(infeed.getName());
process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName());
STORE.publish(EntityType.PROCESS, process);
String pid = assertProcessIsRegistered(process, cluster.getName());
Referenceable processEntity = atlasClient.getEntity(pid);
assertEquals(processEntity.get(AtlasClient.NAME), process.getName());
assertEquals(processEntity.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName()));
assertEquals(((List<Id>)processEntity.get("inputs")).get(0)._getId(), infeedId);
assertEquals(((List<Id>)processEntity.get("outputs")).get(0)._getId(), outfeedId);
}
private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception {
waitFor(80000, new Predicate() {
@Override
public void evaluate() throws Exception {
Referenceable entity = atlasClient.getEntity(typeName, property, value);
assertNotNull(entity);
}
});
Referenceable entity = atlasClient.getEntity(typeName, property, value);
return entity.getId()._getId();
}
public interface Predicate {
/**
* Perform a predicate evaluation.
*
* @return the boolean result of the evaluation.
* @throws Exception thrown if the predicate evaluation could not evaluate.
*/
void evaluate() throws Exception;
}
/**
* Wait for a condition, expressed via a {@link Predicate} to become true.
*
* @param timeout maximum time in milliseconds to wait for the predicate to become true.
* @param predicate predicate waiting on.
*/
protected void waitFor(int timeout, Predicate predicate) throws Exception {
ParamChecker.notNull(predicate, "predicate");
long mustEnd = System.currentTimeMillis() + timeout;
while (true) {
try {
predicate.evaluate();
return;
} catch(Error | Exception e) {
if (System.currentTimeMillis() >= mustEnd) {
fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e);
}
LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e);
Thread.sleep(400);
}
}
}
}