/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hcatalog.security;

import static org.apache.hcatalog.HcatTestUtils.perm300;
import static org.apache.hcatalog.HcatTestUtils.perm500;
import static org.apache.hcatalog.HcatTestUtils.perm555;
import static org.apache.hcatalog.HcatTestUtils.perm700;
import static org.apache.hcatalog.HcatTestUtils.perm755;

import java.io.IOException;
import java.util.Random;

import junit.framework.Assert;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.HcatTestUtils;
import org.apache.hcatalog.cli.HCatDriver;
import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestHdfsAuthorizationProvider {

    protected HCatDriver hcatDriver;
    protected HiveMetaStoreClient msc;
    protected HiveConf conf;
    protected String whDir;
    protected Path whPath;
    protected FileSystem whFs;
    protected Warehouse wh;
    protected Hive hive;

    @Before
    public void setUp() throws Exception {

        conf = new HiveConf(this.getClass());
        conf.set(ConfVars.PREEXECHOOKS.varname, "");
        conf.set(ConfVars.POSTEXECHOOKS.varname, "");
        conf.set(ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");

        conf.set("hive.metastore.local", "true");
        conf.set(ConfVars.SEMANTIC_ANALYZER_HOOK.varname, HCatSemanticAnalyzer.class.getName());
        conf.setBoolVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
                StorageDelegationAuthorizationProvider.class.getCanonicalName());
        conf.set("fs.pfile.impl", "org.apache.hadoop.fs.ProxyLocalFileSystem");

        whDir = System.getProperty("test.warehouse.dir", "/tmp/testhdfsauthorization_wh");
        conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, whDir);

        UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
        String username = ShimLoader.getHadoopShims().getShortUserName(ugi);

        whPath = new Path(whDir);
        whFs = whPath.getFileSystem(conf);

        wh = new Warehouse(conf);
        hive = Hive.get(conf);

        //clean up mess in HMS
        HcatTestUtils.cleanupHMS(hive, wh, perm700);

        whFs.delete(whPath, true);
        whFs.mkdirs(whPath, perm755);

        SessionState.start(new CliSessionState(conf));
        hcatDriver = new HCatDriver();
    }

    @After
    public void tearDown() throws IOException {
        whFs.close();
        hcatDriver.close();
        Hive.closeCurrent();
    }

    public Path getDbPath(String dbName) throws MetaException, HiveException {
        return HcatTestUtils.getDbPath(hive, wh, dbName);
    }

    public Path getTablePath(String dbName, String tableName) throws HiveException {
        Table table = hive.getTable(dbName, tableName);
        return table.getPath();
    }

    public Path getPartPath(String partName, String dbName, String tableName) throws HiveException {
        return new Path(getTablePath(dbName, tableName), partName);
    }

    /** Execute the query expecting success*/
    public void exec(String format, Object... args) throws Exception {
        String command = String.format(format, args);
        CommandProcessorResponse resp = hcatDriver.run(command);
        Assert.assertEquals(resp.getErrorMessage(), 0, resp.getResponseCode());
        Assert.assertEquals(resp.getErrorMessage(), null, resp.getErrorMessage());
    }

    /** Execute the query expecting it to fail with AuthorizationException */
    public void execFail(String format, Object... args) throws Exception {
        String command = String.format(format, args);
        CommandProcessorResponse resp = hcatDriver.run(command);
        Assert.assertNotSame(resp.getErrorMessage(), 0, resp.getResponseCode());
        Assert.assertTrue((resp.getResponseCode() == 40000) || (resp.getResponseCode() == 403));
        if (resp.getErrorMessage() != null) {
            Assert.assertTrue(resp.getErrorMessage().contains("org.apache.hadoop.security.AccessControlException"));
        }
    }


    /**
     * Tests whether the warehouse directory is writable by the current user (as defined by Hadoop)
     */
    @Test
    public void testWarehouseIsWritable() throws Exception {
        Path top = new Path(whPath, "_foobarbaz12_");
        try {
            whFs.mkdirs(top);
        } finally {
            whFs.delete(top, true);
        }
    }

    @Test
    public void testShowDatabases() throws Exception {
        exec("CREATE DATABASE doo");
        exec("SHOW DATABASES");

        whFs.setPermission(whPath, perm300); //revoke r
        execFail("SHOW DATABASES");
    }

    @Test
    public void testDatabaseOps() throws Exception {
        exec("SHOW TABLES");
        exec("SHOW TABLE EXTENDED LIKE foo1");

        whFs.setPermission(whPath, perm700);
        exec("CREATE DATABASE doo");
        exec("DESCRIBE DATABASE doo");
        exec("USE doo");
        exec("SHOW TABLES");
        exec("SHOW TABLE EXTENDED LIKE foo1");
        exec("DROP DATABASE doo");

        //custom location
        Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
        whFs.mkdirs(dbPath, perm700);
        exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
        exec("DESCRIBE DATABASE doo2", dbPath.toUri());
        exec("USE doo2");
        exec("SHOW TABLES");
        exec("SHOW TABLE EXTENDED LIKE foo1");
        exec("DROP DATABASE doo2", dbPath.toUri());

        //custom non-existing location
        exec("CREATE DATABASE doo3 LOCATION '%s/subpath'", dbPath.toUri());
    }

    @Test
    public void testCreateDatabaseFail1() throws Exception {
        whFs.setPermission(whPath, perm500);
        execFail("CREATE DATABASE doo"); //in the default location

        whFs.setPermission(whPath, perm555);
        execFail("CREATE DATABASE doo2");
    }

    @Test
    public void testCreateDatabaseFail2() throws Exception {
        //custom location
        Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");

        whFs.mkdirs(dbPath, perm700);
        whFs.setPermission(dbPath, perm500);
        execFail("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
    }

    @Test
    public void testDropDatabaseFail1() throws Exception {
        whFs.setPermission(whPath, perm700);
        exec("CREATE DATABASE doo"); //in the default location

        whFs.setPermission(getDbPath("doo"), perm500); //revoke write
        execFail("DROP DATABASE doo");
    }

    @Test
    public void testDropDatabaseFail2() throws Exception {
        //custom location
        Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");

        whFs.mkdirs(dbPath, perm700);
        exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());

        whFs.setPermission(dbPath, perm500);
        execFail("DROP DATABASE doo2");
    }

    @Test
    public void testDescSwitchDatabaseFail() throws Exception {
        whFs.setPermission(whPath, perm700);
        exec("CREATE DATABASE doo");
        whFs.setPermission(getDbPath("doo"), perm300); //revoke read
        execFail("DESCRIBE DATABASE doo");
        execFail("USE doo");

        //custom location
        Path dbPath = new Path(whPath, new Random().nextInt() + "/mydb");
        whFs.mkdirs(dbPath, perm700);
        exec("CREATE DATABASE doo2 LOCATION '%s'", dbPath.toUri());
        whFs.mkdirs(dbPath, perm300); //revoke read
        execFail("DESCRIBE DATABASE doo2", dbPath.toUri());
        execFail("USE doo2");
    }

    @Test
    public void testShowTablesFail() throws Exception {
        whFs.setPermission(whPath, perm700);
        exec("CREATE DATABASE doo");
        exec("USE doo");
        whFs.setPermission(getDbPath("doo"), perm300); //revoke read
        execFail("SHOW TABLES");
        execFail("SHOW TABLE EXTENDED LIKE foo1");
    }

    @Test
    public void testTableOps() throws Exception {
        //default db
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        exec("DESCRIBE foo1");
        exec("DROP TABLE foo1");

        //default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm700);
        exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        exec("DESCRIBE foo2");
        exec("DROP TABLE foo2");

        //default db custom non existing location
        exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
        exec("DESCRIBE foo3");
        exec("DROP TABLE foo3");

        //non default db
        exec("CREATE DATABASE doo");
        exec("USE doo");

        exec("CREATE TABLE foo4 (foo INT) STORED AS RCFILE");
        exec("DESCRIBE foo4");
        exec("DROP TABLE foo4");

        //non-default db custom location
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm700);
        exec("CREATE EXTERNAL TABLE foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        exec("DESCRIBE foo5");
        exec("DROP TABLE foo5");

        //non-default db custom non existing location
        exec("CREATE EXTERNAL TABLE foo6 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
        exec("DESCRIBE foo6");
        exec("DROP TABLE foo6");

        exec("DROP TABLE IF EXISTS foo_non_exists");

        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        exec("DESCRIBE EXTENDED foo1");
        exec("DESCRIBE FORMATTED foo1");
        exec("DESCRIBE foo1.foo");

        //deep non-existing path for the table
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm700);
        exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s/a/a/a/'", tablePath);
    }

    @Test
    public void testCreateTableFail1() throws Exception {
        //default db
        whFs.mkdirs(whPath, perm500); //revoke w
        execFail("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
    }

    @Test
    public void testCreateTableFail2() throws Exception {
        //default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm500);
        execFail("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);

        //default db custom non existing location
        execFail("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s/subpath'", tablePath);
    }

    @Test
    public void testCreateTableFail3() throws Exception {
        //non default db
        exec("CREATE DATABASE doo");
        whFs.setPermission(getDbPath("doo"), perm500);

        execFail("CREATE TABLE doo.foo4 (foo INT) STORED AS RCFILE");

        //non-default db custom location, permission to write to tablePath, but not on db path
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm700);
        exec("USE doo");
        execFail("CREATE EXTERNAL TABLE foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
    }

    @Test
    public void testCreateTableFail4() throws Exception {
        //non default db
        exec("CREATE DATABASE doo");

        //non-default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm500);
        execFail("CREATE EXTERNAL TABLE doo.foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);

        //non-default db custom non existing location
        execFail("CREATE EXTERNAL TABLE doo.foo6 (foo INT) STORED AS RCFILE LOCATION '%s/a/a/a/'", tablePath);
    }

    @Test
    public void testDropTableFail1() throws Exception {
        //default db
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke w
        execFail("DROP TABLE foo1");
    }

    @Test
    public void testDropTableFail2() throws Exception {
        //default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        whFs.mkdirs(tablePath, perm500);
        execFail("DROP TABLE foo2");
    }

    @Test
    public void testDropTableFail4() throws Exception {
        //non default db
        exec("CREATE DATABASE doo");

        //non-default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");

        exec("CREATE EXTERNAL TABLE doo.foo5 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        whFs.mkdirs(tablePath, perm500);
        exec("USE doo"); //There is no DROP TABLE doo.foo5 support in Hive
        execFail("DROP TABLE foo5");
    }

    @Test
    public void testDescTableFail() throws Exception {
        //default db
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        whFs.mkdirs(getTablePath("default", "foo1"), perm300); //revoke read
        execFail("DESCRIBE foo1");

        //default db custom location
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm700);
        exec("CREATE EXTERNAL TABLE foo2 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        whFs.mkdirs(tablePath, perm300); //revoke read
        execFail("DESCRIBE foo2");
    }

    @Test
    public void testAlterTableRename() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        exec("ALTER TABLE foo1 RENAME TO foo2");

        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        exec("ALTER TABLE foo3 RENAME TO foo4");
    }

    @Test
    public void testAlterTableRenameFail() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke write
        execFail("ALTER TABLE foo1 RENAME TO foo2");

        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'", tablePath);
        whFs.mkdirs(tablePath, perm500); //revoke write
        execFail("ALTER TABLE foo3 RENAME TO foo4");
    }

    @Test
    public void testAlterTableRelocate() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        exec("ALTER TABLE foo1 SET LOCATION '%s'", tablePath.makeQualified(whFs));

        tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
        exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'",
                tablePath.makeQualified(whFs));
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
        exec("ALTER TABLE foo3 SET LOCATION '%s'", tablePath.makeQualified(whFs));
    }

    @Test
    public void testAlterTableRelocateFail() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) STORED AS RCFILE");
        Path tablePath = new Path(whPath, new Random().nextInt() + "/mytable");
        whFs.mkdirs(tablePath, perm500); //revoke write
        execFail("ALTER TABLE foo1 SET LOCATION '%s'", tablePath.makeQualified(whFs));

        //dont have access to new table loc
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
        exec("CREATE EXTERNAL TABLE foo3 (foo INT) STORED AS RCFILE LOCATION '%s'",
                tablePath.makeQualified(whFs));
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable2");
        whFs.mkdirs(tablePath, perm500); //revoke write
        execFail("ALTER TABLE foo3 SET LOCATION '%s'", tablePath.makeQualified(whFs));

        //have access to new table loc, but not old table loc
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable3");
        exec("CREATE EXTERNAL TABLE foo4 (foo INT) STORED AS RCFILE LOCATION '%s'",
                tablePath.makeQualified(whFs));
        whFs.mkdirs(tablePath, perm500); //revoke write
        tablePath = new Path(whPath, new Random().nextInt() + "/mytable3");
        execFail("ALTER TABLE foo4 SET LOCATION '%s'", tablePath.makeQualified(whFs));
    }

    @Test
    public void testAlterTable() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        exec("ALTER TABLE foo1 SET TBLPROPERTIES ('foo'='bar')");
        exec("ALTER TABLE foo1 SET SERDEPROPERTIES ('foo'='bar')");
        exec("ALTER TABLE foo1 ADD COLUMNS (foo2 INT)");
    }

    @Test
    public void testAddDropPartition() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
        exec("ALTER TABLE foo1 ADD IF NOT EXISTS PARTITION (b='2010-10-10')");
        String relPath = new Random().nextInt() + "/mypart";
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-11') LOCATION '%s'", relPath);

        exec("ALTER TABLE foo1 PARTITION (b='2010-10-10') SET FILEFORMAT RCFILE");

        exec("ALTER TABLE foo1 PARTITION (b='2010-10-10') SET FILEFORMAT INPUTFORMAT "
                + "'org.apache.hadoop.hive.ql.io.RCFileInputFormat' OUTPUTFORMAT "
                + "'org.apache.hadoop.hive.ql.io.RCFileOutputFormat' inputdriver "
                + "'mydriver' outputdriver 'yourdriver'");

        exec("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
        exec("ALTER TABLE foo1 DROP PARTITION (b='2010-10-11')");
    }

    @Test
    public void testAddPartitionFail1() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        whFs.mkdirs(getTablePath("default", "foo1"), perm500);
        execFail("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
    }

    @Test
    public void testAddPartitionFail2() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        String relPath = new Random().nextInt() + "/mypart";
        Path partPath = new Path(getTablePath("default", "foo1"), relPath);
        whFs.mkdirs(partPath, perm500);
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10') LOCATION '%s'", partPath);
    }

    @Test
    public void testDropPartitionFail1() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10')");
        whFs.mkdirs(getPartPath("b=2010-10-10", "default", "foo1"), perm500);
        execFail("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
    }

    @Test
    public void testDropPartitionFail2() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS TEXTFILE");
        String relPath = new Random().nextInt() + "/mypart";
        Path partPath = new Path(getTablePath("default", "foo1"), relPath);
        whFs.mkdirs(partPath, perm700);
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-10') LOCATION '%s'", partPath);
        whFs.mkdirs(partPath, perm500); //revoke write
        execFail("ALTER TABLE foo1 DROP PARTITION (b='2010-10-10')");
    }

    @Test
    public void testAlterTableFail() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (boo STRING) STORED AS TEXTFILE");
        whFs.mkdirs(getTablePath("default", "foo1"), perm500); //revoke write
        execFail("ALTER TABLE foo1 SET TBLPROPERTIES ('foo'='bar')");
        execFail("ALTER TABLE foo1 SET SERDEPROPERTIES ('foo'='bar')");
        execFail("ALTER TABLE foo1 ADD COLUMNS (foo2 INT)");
    }

    @Test
    public void testShowTables() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (boo STRING) STORED AS TEXTFILE");
        exec("SHOW PARTITIONS foo1");

        whFs.mkdirs(getTablePath("default", "foo1"), perm300); //revoke read
        execFail("SHOW PARTITIONS foo1");
    }

    @Test
    public void testAlterTablePartRename() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
        Path loc = new Path(whPath, new Random().nextInt() + "/mypart");
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", loc);
        exec("ALTER TABLE foo1 PARTITION (b='2010-10-16') RENAME TO PARTITION (b='2010-10-17')");
    }

    @Test
    public void testAlterTablePartRenameFail() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
        Path loc = new Path(whPath, new Random().nextInt() + "/mypart");
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", loc);
        whFs.setPermission(loc, perm500); //revoke w
        execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') RENAME TO PARTITION (b='2010-10-17')");
    }

    @Test
    public void testAlterTablePartRelocate() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");
        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16')");
        Path partPath = new Path(whPath, new Random().nextInt() + "/mypart");
        exec("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", partPath.makeQualified(whFs));
    }

    @Test
    public void testAlterTablePartRelocateFail() throws Exception {
        exec("CREATE TABLE foo1 (foo INT) PARTITIONED BY (b STRING) STORED AS RCFILE");

        Path oldLoc = new Path(whPath, new Random().nextInt() + "/mypart");
        Path newLoc = new Path(whPath, new Random().nextInt() + "/mypart2");

        exec("ALTER TABLE foo1 ADD PARTITION (b='2010-10-16') LOCATION '%s'", oldLoc);
        whFs.mkdirs(oldLoc, perm500);
        execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", newLoc.makeQualified(whFs));
        whFs.mkdirs(oldLoc, perm700);
        whFs.mkdirs(newLoc, perm500);
        execFail("ALTER TABLE foo1 PARTITION (b='2010-10-16') SET LOCATION '%s'", newLoc.makeQualified(whFs));
    }

}
