| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.falcon.entity.store; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.falcon.FalconException; |
| import org.apache.falcon.entity.AbstractTestBase; |
| import org.apache.falcon.entity.v0.EntityType; |
| import org.apache.falcon.entity.v0.feed.CatalogTable; |
| import org.apache.falcon.entity.v0.feed.Cluster; |
| import org.apache.falcon.entity.v0.feed.Clusters; |
| import org.apache.falcon.entity.v0.feed.Feed; |
| import org.apache.falcon.entity.v0.feed.Location; |
| import org.apache.falcon.entity.v0.feed.LocationType; |
| import org.apache.falcon.entity.v0.feed.Locations; |
| import org.apache.falcon.security.CurrentUser; |
| import org.apache.falcon.util.FalconRadixUtils; |
| import org.apache.falcon.util.FalconTestUtil; |
| import org.apache.falcon.util.StartupProperties; |
| import org.testng.Assert; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import java.io.File; |
| import java.net.URI; |
| import java.util.Collection; |
| |
| |
| /** |
| * Tests for FeedLocationStore. |
| */ |
| public class FeedLocationStoreTest extends AbstractTestBase { |
| private ConfigurationStore store; |
| |
| |
| @BeforeClass |
| public void initConfigStore() throws Exception { |
| String configPath = new URI(StartupProperties.get().getProperty("config.store.uri")).getPath(); |
| String location = configPath + "-" + getClass().getName(); |
| StartupProperties.get().setProperty("config.store.uri", location); |
| FileUtils.deleteDirectory(new File(location)); |
| |
| cleanupStore(); |
| String listeners = StartupProperties.get().getProperty("configstore.listeners"); |
| StartupProperties.get().setProperty("configstore.listeners", |
| listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "")); |
| // StartupProperties.get().setProperty("configstore.listeners", |
| // "org.apache.falcon.entity.store.FeedLocationStore"); |
| store = ConfigurationStore.get(); |
| store.init(); |
| |
| CurrentUser.authenticate(FalconTestUtil.TEST_USER_2); |
| |
| } |
| @BeforeMethod |
| public void setUp() throws FalconException{ |
| cleanupStore(); |
| createClusters(); |
| } |
| |
| @AfterMethod |
| public void print() { |
| System.out.printf("%s", FeedLocationStore.get().store); |
| } |
| |
| @Test |
| public void testOnAddSameLocation() throws FalconException{ |
| Feed f1 = createFeed("f1SameLocations"); |
| int initialSize = FeedLocationStore.get().store.getSize(); |
| f1.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| f1.getLocations().getLocations().add(createLocation(LocationType.STATS, |
| "/projects/cas/stats/hourly/2014/09/09/09")); |
| |
| Feed f2 = createFeed("f2SameLocations"); |
| f2.getLocations().getLocations().add(createLocation(LocationType.STATS, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| f2.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/projects/cas/stats/hourly/2014/09/09/09")); |
| |
| store.publish(EntityType.FEED, f1); |
| store.publish(EntityType.FEED, f2); |
| int finalSize = FeedLocationStore.get().store.getSize(); |
| Assert.assertEquals(finalSize - initialSize, 8); |
| } |
| |
| @Test |
| public void testOnRemove() throws FalconException{ |
| int initialSize = FeedLocationStore.get().store.getSize(); |
| |
| Feed f1 = createFeed("f1ForRemove"); |
| f1.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| f1.getLocations().getLocations().add(createLocation(LocationType.STATS, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| |
| store.publish(EntityType.FEED, f1); |
| Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 4); |
| store.remove(EntityType.FEED, "f1ForRemove"); |
| Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); |
| |
| } |
| |
| |
| @Test |
| public void testOnChange() throws FalconException{ |
| Feed f1 = createFeed("f1"); |
| f1.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| store.publish(EntityType.FEED, f1); |
| |
| Feed f2 = createFeed("f1"); |
| f2.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/projects/cas/data/monthly")); |
| store.initiateUpdate(f2); |
| store.update(EntityType.FEED, f2); |
| store.cleanupUpdateInit(); |
| |
| Feed f3 = createFeed("f2"); |
| f3.getLocations().getLocations().add(createLocation(LocationType.STATS, |
| "/projects/cas/data/hourly/2014/09/09/09")); |
| store.publish(EntityType.FEED, f3); |
| |
| } |
| |
| @Test |
| public void testWithClusterLocations() throws FalconException { |
| Feed f = createFeedWithClusterLocations("clusterFeed"); |
| int initialSize = FeedLocationStore.get().store.getSize(); |
| store.publish(EntityType.FEED, f); |
| Assert.assertEquals(FeedLocationStore.get().store.getSize() - initialSize, 6); |
| store.remove(EntityType.FEED, "clusterFeed"); |
| Assert.assertEquals(FeedLocationStore.get().store.getSize(), initialSize); |
| } |
| |
| |
| @Test |
| public void testFindWithRegularExpression() throws FalconException { |
| Feed f = createFeed("findUsingRegexFeed"); |
| f.getLocations().getLocations().add(createLocation(LocationType.DATA, |
| "/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}")); |
| store.publish(EntityType.FEED, f); |
| Assert.assertNotNull(FeedLocationStore.get().store.find("/falcon/test/input/2014/12/12/23", |
| new FalconRadixUtils.FeedRegexAlgorithm())); |
| } |
| |
| @Test |
| public void testAddCatalogStorageFeeds() throws FalconException { |
| //this test ensure that catalog feeds are ignored in FeedLocationStore |
| Feed f = createCatalogFeed("catalogFeed"); |
| store.publish(EntityType.FEED, f); |
| Assert.assertTrue(true); |
| } |
| |
| private Feed createCatalogFeed(String name) { |
| Feed f = new Feed(); |
| f.setName(name); |
| f.setClusters(createBlankClusters()); |
| f.setTable(new CatalogTable()); |
| return f; |
| } |
| |
| private Feed createFeed(String name){ |
| Feed f = new Feed(); |
| Locations locations = new Locations(); |
| f.setLocations(locations); |
| f.setName(name); |
| f.setClusters(createBlankClusters()); |
| return f; |
| } |
| |
| |
| private Feed createFeedWithClusterLocations(String name) { |
| Feed f = new Feed(); |
| f.setLocations(new Locations()); |
| f.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/data")); |
| f.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/stats")); |
| f.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/meta")); |
| f.setName(name); |
| f.setClusters(createClustersWithLocations()); |
| return f; |
| } |
| |
| private Location createLocation(LocationType type, String path){ |
| Location location = new Location(); |
| location.setPath(path); |
| location.setType(type); |
| return location; |
| } |
| |
| protected void cleanupStore() throws FalconException { |
| store = ConfigurationStore.get(); |
| for (EntityType type : EntityType.values()) { |
| Collection<String> entities = store.getEntities(type); |
| for (String entity : entities) { |
| store.remove(type, entity); |
| } |
| } |
| } |
| |
| private Clusters createClustersWithLocations() { |
| Clusters clusters = new Clusters(); |
| Cluster cluster1 = new Cluster(); |
| cluster1.setName("cluster1WithLocations"); |
| cluster1.setLocations(new Locations()); |
| cluster1.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster1/data")); |
| cluster1.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster1/stats")); |
| cluster1.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster1/meta")); |
| |
| Cluster cluster2 = new Cluster(); |
| cluster2.setName("cluster2WithLocations"); |
| cluster2.setLocations(new Locations()); |
| cluster2.getLocations().getLocations().add(createLocation(LocationType.DATA, "/projects/cas/cluster2/data")); |
| cluster2.getLocations().getLocations().add(createLocation(LocationType.STATS, "/projects/cas/cluster2/stats")); |
| cluster2.getLocations().getLocations().add(createLocation(LocationType.META, "/projects/cas/cluster2/meta")); |
| |
| clusters.getClusters().add(cluster1); |
| clusters.getClusters().add(cluster2); |
| |
| return clusters; |
| } |
| |
| private Clusters createBlankClusters() { |
| Clusters clusters = new Clusters(); |
| |
| Cluster cluster = new Cluster(); |
| cluster.setName("blankCluster1"); |
| clusters.getClusters().add(cluster); |
| |
| Cluster cluster2 = new Cluster(); |
| cluster2.setName("blankCluster2"); |
| clusters.getClusters().add(cluster2); |
| |
| return clusters; |
| } |
| |
| private void createClusters() throws FalconException { |
| String[] clusterNames = {"cluster1WithLocations", "cluster2WithLocations", "blankCluster1", "blankCluster2"}; |
| for (String name : clusterNames) { |
| org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster(); |
| cluster.setName(name); |
| cluster.setColo("default"); |
| store.publish(EntityType.CLUSTER, cluster); |
| } |
| } |
| } |