blob: 0db2b1627362f13e5b2b917749c55a9437df0321 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.listener;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hcatalog.common.HCatConstants;
public class TestMsgBusConnection extends TestCase {
private Driver driver;
private BrokerService broker;
private MessageConsumer consumer;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
// configure the broker
broker.addConnector("tcp://localhost:61616?broker.persistent=false");
broker.start();
System.setProperty("java.naming.factory.initial",
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
connectClient();
HiveConf hiveConf = new HiveConf(this.getClass());
hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
NotificationListener.class.getName());
hiveConf.set("hive.metastore.local", "true");
hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
SessionState.start(new CliSessionState(hiveConf));
driver = new Driver(hiveConf);
}
private void connectClient() throws JMSException {
ConnectionFactory connFac = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
Connection conn = connFac.createConnection();
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Destination hcatTopic = session.createTopic("planetlab.hcat");
consumer = session.createConsumer(hcatTopic);
}
public void testConnection() throws Exception {
try {
driver.run("create database testconndb");
Message msg = consumer.receive();
assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
assertEquals("testconndb",
((Database) ((ObjectMessage) msg).getObject()).getName());
broker.stop();
driver.run("drop database testconndb cascade");
broker.start(true);
connectClient();
driver.run("create database testconndb");
msg = consumer.receive();
assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
assertEquals("testconndb",
((Database) ((ObjectMessage) msg).getObject()).getName());
driver.run("drop database testconndb cascade");
msg = consumer.receive();
assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
msg.getStringProperty(HCatConstants.HCAT_EVENT));
assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
assertEquals("testconndb",
((Database) ((ObjectMessage) msg).getObject()).getName());
} catch (NoSuchObjectException nsoe) {
nsoe.printStackTrace(System.err);
assert false;
} catch (AlreadyExistsException aee) {
aee.printStackTrace(System.err);
assert false;
}
}
}