blob: 88df94ea7dafa32850232fd21b282d07d2e59ead [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.functional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import org.apache.accumulo.cluster.AccumuloCluster;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.CloneConfiguration;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Assume;
import org.junit.Test;
public class CloneTestIT extends AccumuloClusterHarness {
@Override
protected int defaultTimeoutSeconds() {
return 2 * 60;
}
@Test
public void testProps() throws Exception {
String[] tableNames = getUniqueNames(2);
String table1 = tableNames[0];
String table2 = tableNames[1];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
c.tableOperations().create(table1);
c.tableOperations().setProperty(table1, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"1M");
c.tableOperations().setProperty(table1,
Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey(), "2M");
c.tableOperations().setProperty(table1, Property.TABLE_FILE_MAX.getKey(), "23");
writeDataAndClone(c, table1, table2);
checkData(table2, c);
checkMetadata(table2, c);
HashMap<String,String> tableProps = new HashMap<>();
for (Entry<String,String> prop : c.tableOperations().getProperties(table2)) {
tableProps.put(prop.getKey(), prop.getValue());
}
assertEquals("500K", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey()));
assertEquals(Property.TABLE_FILE_MAX.getDefaultValue(),
tableProps.get(Property.TABLE_FILE_MAX.getKey()));
assertEquals("2M", tableProps.get(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX.getKey()));
c.tableOperations().delete(table1);
c.tableOperations().delete(table2);
}
}
private void assertTableState(String tableName, AccumuloClient c, TableState expected) {
String tableId = c.tableOperations().tableIdMap().get(tableName);
TableState tableState = Tables.getTableState((ClientContext) c, TableId.of(tableId));
assertEquals(expected, tableState);
}
private void checkData(String table2, AccumuloClient c) throws TableNotFoundException {
try (Scanner scanner = c.createScanner(table2, Authorizations.EMPTY)) {
HashMap<String,String> expected = new HashMap<>();
expected.put("001:x", "9");
expected.put("001:y", "7");
expected.put("008:x", "3");
expected.put("008:y", "4");
HashMap<String,String> actual = new HashMap<>();
for (Entry<Key,Value> entry : scanner)
actual.put(entry.getKey().getRowData() + ":" + entry.getKey().getColumnQualifierData(),
entry.getValue().toString());
assertEquals(expected, actual);
}
}
private void checkMetadata(String table, AccumuloClient client) throws Exception {
try (Scanner s = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
s.fetchColumnFamily(DataFileColumnFamily.NAME);
ServerColumnFamily.DIRECTORY_COLUMN.fetch(s);
String tableId = client.tableOperations().tableIdMap().get(table);
assertNotNull("Could not get table id for " + table, tableId);
s.setRange(Range.prefix(tableId));
Key k;
Text cf = new Text(), cq = new Text();
int itemsInspected = 0;
for (Entry<Key,Value> entry : s) {
itemsInspected++;
k = entry.getKey();
k.getColumnFamily(cf);
k.getColumnQualifier(cq);
if (cf.equals(DataFileColumnFamily.NAME)) {
Path p = new Path(cq.toString());
FileSystem fs = cluster.getFileSystem();
assertTrue("File does not exist: " + p, fs.exists(p));
} else if (cf.equals(ServerColumnFamily.DIRECTORY_COLUMN.getColumnFamily())) {
assertEquals("Saw unexpected cq",
ServerColumnFamily.DIRECTORY_COLUMN.getColumnQualifier(), cq);
String dirName = entry.getValue().toString();
assertTrue("Bad dir name " + dirName, dirName.matches("[tc]-[0-9a-z]+"));
} else {
fail("Got unexpected key-value: " + entry);
throw new RuntimeException();
}
}
assertTrue("Expected to find metadata entries", itemsInspected > 0);
}
}
private BatchWriter writeData(String table1, AccumuloClient c) throws Exception {
BatchWriter bw = c.createBatchWriter(table1);
Mutation m1 = new Mutation("001");
m1.put("data", "x", "9");
m1.put("data", "y", "7");
Mutation m2 = new Mutation("008");
m2.put("data", "x", "3");
m2.put("data", "y", "4");
bw.addMutation(m1);
bw.addMutation(m2);
bw.flush();
return bw;
}
private void writeDataAndClone(AccumuloClient c, String table1, String table2) throws Exception {
try (BatchWriter bw = writeData(table1, c)) {
Map<String,String> props = new HashMap<>();
props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
Set<String> exclude = new HashSet<>();
exclude.add(Property.TABLE_FILE_MAX.getKey());
c.tableOperations().clone(table1, table2, true, props, exclude);
assertTableState(table2, c, TableState.ONLINE);
Mutation m3 = new Mutation("009");
m3.put("data", "x", "1");
m3.put("data", "y", "2");
bw.addMutation(m3);
}
}
@Test
public void testDeleteClone() throws Exception {
String[] tableNames = getUniqueNames(3);
String table1 = tableNames[0];
String table2 = tableNames[1];
String table3 = tableNames[2];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
AccumuloCluster cluster = getCluster();
Assume.assumeTrue(cluster instanceof MiniAccumuloClusterImpl);
MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) cluster;
String rootPath = mac.getConfig().getDir().getAbsolutePath();
// verify that deleting a new table removes the files
c.tableOperations().create(table3);
writeData(table3, c).close();
c.tableOperations().flush(table3, null, null, true);
// check for files
FileSystem fs = getCluster().getFileSystem();
String id = c.tableOperations().tableIdMap().get(table3);
FileStatus[] status = fs.listStatus(new Path(rootPath + "/accumulo/tables/" + id));
assertTrue(status.length > 0);
// verify disk usage
List<DiskUsage> diskUsage = c.tableOperations().getDiskUsage(Collections.singleton(table3));
assertEquals(1, diskUsage.size());
assertTrue(diskUsage.get(0).getUsage() > 100);
// delete the table
c.tableOperations().delete(table3);
// verify its gone from the file system
Path tablePath = new Path(rootPath + "/accumulo/tables/" + id);
if (fs.exists(tablePath)) {
status = fs.listStatus(tablePath);
assertTrue(status == null || status.length == 0);
}
c.tableOperations().create(table1);
writeDataAndClone(c, table1, table2);
// delete source table, should not affect clone
c.tableOperations().delete(table1);
checkData(table2, c);
c.tableOperations().compact(table2, null, null, true, true);
checkData(table2, c);
c.tableOperations().delete(table2);
}
}
@Test
public void testOfflineClone() throws Exception {
String[] tableNames = getUniqueNames(3);
String table1 = tableNames[0];
String table2 = tableNames[1];
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
AccumuloCluster cluster = getCluster();
Assume.assumeTrue(cluster instanceof MiniAccumuloClusterImpl);
c.tableOperations().create(table1);
writeData(table1, c);
Map<String,String> props = new HashMap<>();
props.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "500K");
Set<String> exclude = new HashSet<>();
exclude.add(Property.TABLE_FILE_MAX.getKey());
c.tableOperations().clone(table1, table2, CloneConfiguration.builder().setFlush(true)
.setPropertiesToSet(props).setPropertiesToExclude(exclude).setKeepOffline(true).build());
assertTableState(table2, c, TableState.OFFLINE);
// delete tables
c.tableOperations().delete(table1);
c.tableOperations().delete(table2);
}
}
@Test
public void testCloneWithSplits() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
List<Mutation> mutations = new ArrayList<>();
TreeSet<Text> splits = new TreeSet<>();
for (int i = 0; i < 10; i++) {
splits.add(new Text(Integer.toString(i)));
Mutation m = new Mutation(Integer.toString(i));
m.put("", "", "");
mutations.add(m);
}
String[] tables = getUniqueNames(2);
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splits);
client.tableOperations().create(tables[0], ntc);
try (BatchWriter bw = client.createBatchWriter(tables[0])) {
bw.addMutations(mutations);
}
client.tableOperations().clone(tables[0], tables[1], true, null, null);
client.tableOperations().deleteRows(tables[1], new Text("4"), new Text("8"));
List<String> rows = Arrays.asList("0", "1", "2", "3", "4", "9");
List<String> actualRows = new ArrayList<>();
try (var scanner = client.createScanner(tables[1], Authorizations.EMPTY)) {
for (Entry<Key,Value> entry : scanner) {
actualRows.add(entry.getKey().getRow().toString());
}
}
assertEquals(rows, actualRows);
}
}
@Test(expected = AccumuloException.class)
public void testCloneRootTable() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().clone(RootTable.NAME, "rc1", true, null, null);
}
}
@Test(expected = AccumuloException.class)
public void testCloneMetadataTable() throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().clone(MetadataTable.NAME, "mc1", true, null, null);
}
}
}