blob: a8a9de5b60a26e3a692f8092418a6be435bb1aa8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.pig;
import java.io.IOException;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.MiniCluster;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
public class PigTestBase extends SchemaLoader
{
protected static EmbeddedCassandraService cassandra;
protected static Configuration conf;
protected static MiniCluster cluster;
protected static PigServer pig;
protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.Murmur3Partitioner";
protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" +
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
static
{
System.setProperty("logback.configurationFile", "logback-test.xml");
System.setProperty("cassandra.config", "cassandra_pig.yaml");
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Before
public void beforeTest() throws Exception {
pig = new PigServer(new PigContext(ExecType.LOCAL, ConfigurationUtil.toProperties(conf)));
PigContext.initializeImportList("org.apache.cassandra.hadoop.pig");
}
@After
public void tearDown() throws Exception {
pig.shutdown();
}
protected static Cassandra.Client getClient() throws TTransportException
{
TTransport tr = new TFramedTransport(new TSocket("localhost", 9170));
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
return client;
}
protected static void startCassandra() throws IOException
{
Schema.instance.clear(); // Schema are now written on disk and will be reloaded
cassandra = new EmbeddedCassandraService();
cassandra.start();
}
protected static void startHadoopCluster()
{
cluster = MiniCluster.buildCluster();
conf = cluster.getConfiguration();
}
protected AbstractType parseType(String type) throws IOException
{
try
{
return TypeParser.parse(type);
}
catch (ConfigurationException | SyntaxException e)
{
throw new IOException(e);
}
}
protected static void executeCQLStatements(String[] statements) throws TException
{
Cassandra.Client client = getClient();
for (String statement : statements)
{
System.out.println("Executing statement: " + statement);
client.execute_cql3_query(ByteBufferUtil.bytes(statement), Compression.NONE, ConsistencyLevel.ONE);
}
}
}