blob: a91319ee2ae2ee89b05c8ce847612a281bb17318 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.state.cluster;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.H2DatabaseCleaner;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.api.services.ServiceGroupKey;
import org.apache.ambari.server.api.services.ServiceKey;
import org.apache.ambari.server.controller.ServiceConfigVersionResponse;
import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
import org.apache.ambari.server.orm.dao.ServiceConfigDAO;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentFactory;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostFactory;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.ServiceGroup;
import org.apache.ambari.server.state.ServiceGroupFactory;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
/**
* Tests that concurrent threads attempting to create configurations don't cause
* unique violations with the configuration version.
*/
public class ConcurrentServiceConfigVersionTest {
private static final int NUMBER_OF_SERVICE_CONFIG_VERSIONS = 10;
private static final int NUMBER_OF_THREADS = 2;
@Inject
private Injector injector;
@Inject
private Clusters clusters;
@Inject
private ServiceFactory serviceFactory;
@Inject
private ServiceGroupFactory serviceGroupFactory;
@Inject
private ServiceComponentFactory serviceComponentFactory;
@Inject
private ServiceComponentHostFactory serviceComponentHostFactory;
@Inject
private OrmTestHelper helper;
@Inject
private ServiceConfigDAO serviceConfigDAO;
private StackId stackId = new StackId("HDP-0.1");
/**
* The cluster.
*/
private Cluster cluster;
private ServiceGroup serviceGroup;
private RepositoryVersionEntity repositoryVersion;
/**
* Creates a cluster and installs HDFS with NN and DN.
*
* @throws Exception
*/
@Before
public void setup() throws Exception {
injector = Guice.createInjector(Modules.override(
new InMemoryDefaultTestModule()).with(new MockModule()));
injector.getInstance(GuiceJpaInitializer.class);
injector.injectMembers(this);
repositoryVersion = helper.getOrCreateRepositoryVersion(stackId, stackId.getStackVersion());
helper.createStack(stackId);
clusters.addCluster("c1", stackId);
cluster = clusters.getCluster("c1");
serviceGroup = serviceGroupFactory.createNew(cluster, "test_service_group", new HashSet<ServiceGroupKey>());
String hostName = "c6401.ambari.apache.org";
clusters.addHost(hostName);
setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
clusters.mapHostToCluster(hostName, "c1");
Service service = installService("HDFS");
addServiceComponent(service, "NAMENODE");
addServiceComponent(service, "DATANODE");
createNewServiceComponentHost("HDFS", "NAMENODE", hostName);
createNewServiceComponentHost("HDFS", "DATANODE", hostName);
}
@After
public void teardown() throws AmbariException, SQLException {
H2DatabaseCleaner.clearDatabaseAndStopPersistenceService(injector);
}
/**
* Tests that creating service config versions from multiple threads doesn't
* violate unique constraints.
*
* @throws Exception
*/
@Test
public void testConcurrentServiceConfigVersions() throws Exception {
long nextVersion = serviceConfigDAO.findNextServiceConfigVersion(
cluster.getClusterId(), 1L);
Assert.assertEquals(nextVersion, 1);
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
Thread thread = new ConcurrentServiceConfigThread(cluster);
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long maxVersion = NUMBER_OF_THREADS * NUMBER_OF_SERVICE_CONFIG_VERSIONS;
nextVersion = serviceConfigDAO.findNextServiceConfigVersion(
cluster.getClusterId(), 1L);
Assert.assertEquals(maxVersion + 1, nextVersion);
}
private final static class ConcurrentServiceConfigThread extends Thread {
private Cluster cluster = null;
private ConcurrentServiceConfigThread(Cluster cluster) {
this.cluster = cluster;
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
try {
for (int i = 0; i < NUMBER_OF_SERVICE_CONFIG_VERSIONS; i++) {
ServiceConfigVersionResponse response = cluster.createServiceConfigVersion(
1L, null, getName() + "-serviceConfig" + i, null);
Thread.sleep(100);
}
} catch (Exception exception) {
throw new RuntimeException(exception);
}
}
}
private void setOsFamily(Host host, String osFamily, String osVersion) {
Map<String, String> hostAttributes = new HashMap<>(2);
hostAttributes.put("os_family", osFamily);
hostAttributes.put("os_release_version", osVersion);
host.setHostAttributes(hostAttributes);
}
private ServiceComponentHost createNewServiceComponentHost(String svc,
String svcComponent, String hostName) throws AmbariException {
Assert.assertNotNull(cluster.getConfigGroups());
Service s = installService(svc);
ServiceComponent sc = addServiceComponent(s, svcComponent);
ServiceComponentHost sch = serviceComponentHostFactory.createNew(sc,
hostName);
sc.addServiceComponentHost(sch);
sch.setDesiredState(State.INSTALLED);
sch.setState(State.INSTALLED);
return sch;
}
private Service installService(String serviceName) throws AmbariException {
Service service = null;
try {
service = cluster.getService(serviceName);
} catch (ServiceNotFoundException e) {
service = serviceFactory.createNew(cluster, serviceGroup, new ArrayList<ServiceKey>(), serviceName, serviceName, repositoryVersion);
cluster.addService(service);
}
return service;
}
private ServiceComponent addServiceComponent(Service service,
String componentName) throws AmbariException {
ServiceComponent serviceComponent = null;
try {
serviceComponent = service.getServiceComponent(componentName);
} catch (ServiceComponentNotFoundException e) {
serviceComponent = serviceComponentFactory.createNew(service,
componentName);
service.addServiceComponent(serviceComponent);
serviceComponent.setDesiredState(State.INSTALLED);
}
return serviceComponent;
}
/**
*
*/
private class MockModule implements Module {
/**
*
*/
@Override
public void configure(Binder binder) {
// this listener gets in the way of actually testing the concurrency
// between the threads; it slows them down too much, so mock it out
binder.bind(HostVersionOutOfSyncListener.class).toInstance(
EasyMock.createNiceMock(HostVersionOutOfSyncListener.class));
}
}
}