| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.test.functional; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeSet; |
| |
| import org.apache.accumulo.core.Constants; |
| import org.apache.accumulo.core.client.Accumulo; |
| import org.apache.accumulo.core.client.AccumuloClient; |
| import org.apache.accumulo.core.client.AccumuloException; |
| import org.apache.accumulo.core.client.AccumuloSecurityException; |
| import org.apache.accumulo.core.client.BatchDeleter; |
| import org.apache.accumulo.core.client.BatchWriter; |
| import org.apache.accumulo.core.client.BatchWriterConfig; |
| import org.apache.accumulo.core.client.IsolatedScanner; |
| import org.apache.accumulo.core.client.MutationsRejectedException; |
| import org.apache.accumulo.core.client.RowIterator; |
| import org.apache.accumulo.core.client.Scanner; |
| import org.apache.accumulo.core.client.TableExistsException; |
| import org.apache.accumulo.core.client.TableNotFoundException; |
| import org.apache.accumulo.core.client.admin.NewTableConfiguration; |
| import org.apache.accumulo.core.clientImpl.ClientContext; |
| import org.apache.accumulo.core.clientImpl.ClientInfo; |
| import org.apache.accumulo.core.clientImpl.Tables; |
| import org.apache.accumulo.core.data.Key; |
| import org.apache.accumulo.core.data.Mutation; |
| import org.apache.accumulo.core.data.TableId; |
| import org.apache.accumulo.core.data.Value; |
| import org.apache.accumulo.core.dataImpl.KeyExtent; |
| import org.apache.accumulo.core.manager.state.tables.TableState; |
| import org.apache.accumulo.core.master.thrift.ManagerState; |
| import org.apache.accumulo.core.metadata.MetadataTable; |
| import org.apache.accumulo.core.metadata.TServerInstance; |
| import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; |
| import org.apache.accumulo.core.security.Authorizations; |
| import org.apache.accumulo.fate.util.UtilWaitThread; |
| import org.apache.accumulo.fate.zookeeper.ZooCache; |
| import org.apache.accumulo.fate.zookeeper.ZooLock; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil; |
| import org.apache.accumulo.harness.AccumuloClusterHarness; |
| import org.apache.accumulo.server.manager.state.CurrentState; |
| import org.apache.accumulo.server.manager.state.MergeInfo; |
| import org.apache.accumulo.server.manager.state.MetaDataTableScanner; |
| import org.apache.accumulo.server.manager.state.TabletStateChangeIterator; |
| import org.apache.hadoop.io.Text; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.collect.Sets; |
| |
| /** |
| * Test to ensure that the {@link TabletStateChangeIterator} properly skips over tablet information |
| * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580) |
| */ |
| public class TabletStateChangeIteratorIT extends AccumuloClusterHarness { |
| private final static Logger log = LoggerFactory.getLogger(TabletStateChangeIteratorIT.class); |
| |
| @Override |
| public int defaultTimeoutSeconds() { |
| return 3 * 60; |
| } |
| |
| @Test |
| public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, |
| TableNotFoundException { |
| |
| try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { |
| |
| String[] tables = getUniqueNames(6); |
| final String t1 = tables[0]; |
| final String t2 = tables[1]; |
| final String t3 = tables[2]; |
| final String metaCopy1 = tables[3]; |
| final String metaCopy2 = tables[4]; |
| final String metaCopy3 = tables[5]; |
| |
| // create some metadata |
| createTable(client, t1, true); |
| createTable(client, t2, false); |
| createTable(client, t3, true); |
| |
| // examine a clone of the metadata table, so we can manipulate it |
| copyTable(client, MetadataTable.NAME, metaCopy1); |
| |
| State state = new State(client); |
| int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state); |
| while (tabletsInFlux > 0) { |
| log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1); |
| UtilWaitThread.sleep(500); |
| copyTable(client, MetadataTable.NAME, metaCopy1); |
| tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state); |
| } |
| assertEquals("No tables should need attention", 0, |
| findTabletsNeedingAttention(client, metaCopy1, state)); |
| |
| // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before |
| // metaCopy1 is modified, copy it for subsequent test. |
| copyTable(client, metaCopy1, metaCopy2); |
| copyTable(client, metaCopy1, metaCopy3); |
| |
| // test the assigned case (no location) |
| removeLocation(client, metaCopy1, t3); |
| assertEquals("Should have two tablets without a loc", 2, |
| findTabletsNeedingAttention(client, metaCopy1, state)); |
| |
| // test the cases where the assignment is to a dead tserver |
| reassignLocation(client, metaCopy2, t3); |
| assertEquals("Should have one tablet that needs to be unassigned", 1, |
| findTabletsNeedingAttention(client, metaCopy2, state)); |
| |
| // test the cases where there is ongoing merges |
| state = new State(client) { |
| @Override |
| public Collection<MergeInfo> merges() { |
| TableId tableIdToModify = TableId.of(client.tableOperations().tableIdMap().get(t3)); |
| return Collections.singletonList( |
| new MergeInfo(new KeyExtent(tableIdToModify, null, null), MergeInfo.Operation.MERGE)); |
| } |
| }; |
| assertEquals("Should have 2 tablets that need to be chopped or unassigned", 1, |
| findTabletsNeedingAttention(client, metaCopy2, state)); |
| |
| // test the bad tablet location state case (inconsistent metadata) |
| state = new State(client); |
| addDuplicateLocation(client, metaCopy3, t3); |
| assertEquals("Should have 1 tablet that needs a metadata repair", 1, |
| findTabletsNeedingAttention(client, metaCopy3, state)); |
| |
| // clean up |
| dropTables(client, t1, t2, t3, metaCopy1, metaCopy2, metaCopy3); |
| } |
| } |
| |
| private void addDuplicateLocation(AccumuloClient client, String table, String tableNameToModify) |
| throws TableNotFoundException, MutationsRejectedException { |
| TableId tableIdToModify = |
| TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); |
| Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); |
| m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new Value("fake:9005")); |
| try (BatchWriter bw = client.createBatchWriter(table)) { |
| bw.addMutation(m); |
| } |
| } |
| |
| private void reassignLocation(AccumuloClient client, String table, String tableNameToModify) |
| throws TableNotFoundException, MutationsRejectedException { |
| TableId tableIdToModify = |
| TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); |
| try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { |
| scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); |
| scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); |
| Entry<Key,Value> entry = scanner.iterator().next(); |
| Mutation m = new Mutation(entry.getKey().getRow()); |
| m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), |
| entry.getKey().getTimestamp()); |
| m.put(entry.getKey().getColumnFamily(), new Text("1234567"), |
| entry.getKey().getTimestamp() + 1, new Value("fake:9005")); |
| try (BatchWriter bw = client.createBatchWriter(table)) { |
| bw.addMutation(m); |
| } |
| } |
| } |
| |
| private void removeLocation(AccumuloClient client, String table, String tableNameToModify) |
| throws TableNotFoundException, MutationsRejectedException { |
| TableId tableIdToModify = |
| TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); |
| BatchDeleter deleter = |
| client.createBatchDeleter(table, Authorizations.EMPTY, 1, new BatchWriterConfig()); |
| deleter |
| .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, null).toMetaRange())); |
| deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME); |
| deleter.delete(); |
| deleter.close(); |
| } |
| |
| private int findTabletsNeedingAttention(AccumuloClient client, String table, State state) |
| throws TableNotFoundException { |
| int results = 0; |
| List<Key> resultList = new ArrayList<>(); |
| try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { |
| MetaDataTableScanner.configureScanner(scanner, state); |
| log.debug("Current state = {}", state); |
| scanner.updateScanIteratorOption("tabletChange", "debug", "1"); |
| for (Entry<Key,Value> e : scanner) { |
| if (e != null) { |
| results++; |
| resultList.add(e.getKey()); |
| } |
| } |
| } |
| log.debug("Tablets in flux: {}", resultList); |
| return results; |
| } |
| |
| private void createTable(AccumuloClient client, String t, boolean online) |
| throws AccumuloSecurityException, AccumuloException, TableNotFoundException, |
| TableExistsException { |
| SortedSet<Text> partitionKeys = new TreeSet<>(); |
| partitionKeys.add(new Text("some split")); |
| NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys); |
| client.tableOperations().create(t, ntc); |
| client.tableOperations().online(t, true); |
| if (!online) { |
| client.tableOperations().offline(t, true); |
| } |
| } |
| |
| /** |
| * Create a copy of the source table by first gathering all the rows of the source in a list of |
| * mutations. Then create the copy of the table and apply the mutations to the copy. |
| */ |
| private void copyTable(AccumuloClient client, String source, String copy) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException, |
| TableExistsException { |
| try { |
| dropTables(client, copy); |
| } catch (TableNotFoundException ex) { |
| // ignored |
| } |
| |
| log.info("Gathering rows to copy {} ", source); |
| List<Mutation> mutations = new ArrayList<>(); |
| |
| try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) { |
| RowIterator rows = new RowIterator(new IsolatedScanner(scanner)); |
| |
| while (rows.hasNext()) { |
| Iterator<Entry<Key,Value>> row = rows.next(); |
| Mutation m = null; |
| |
| while (row.hasNext()) { |
| Entry<Key,Value> entry = row.next(); |
| Key k = entry.getKey(); |
| if (m == null) |
| m = new Mutation(k.getRow()); |
| |
| m.put(k.getColumnFamily(), k.getColumnQualifier(), k.getColumnVisibilityParsed(), |
| k.getTimestamp(), entry.getValue()); |
| } |
| |
| mutations.add(m); |
| } |
| } |
| |
| // metadata should be stable with only 7 rows (1 replication + 2 for each table) |
| log.debug("Gathered {} rows to create copy {}", mutations.size(), copy); |
| assertEquals("Metadata should have 7 rows (1 repl + 2 for each table)", 7, mutations.size()); |
| client.tableOperations().create(copy); |
| |
| try (BatchWriter writer = client.createBatchWriter(copy, new BatchWriterConfig())) { |
| for (Mutation m : mutations) { |
| writer.addMutation(m); |
| } |
| } |
| |
| log.info("Finished creating copy " + copy); |
| } |
| |
| private void dropTables(AccumuloClient client, String... tables) |
| throws AccumuloException, AccumuloSecurityException, TableNotFoundException { |
| for (String t : tables) { |
| client.tableOperations().delete(t); |
| } |
| } |
| |
| private static class State implements CurrentState { |
| |
| final AccumuloClient client; |
| |
| State(AccumuloClient client) { |
| this.client = client; |
| } |
| |
| private Set<TServerInstance> tservers; |
| private Set<TableId> onlineTables; |
| |
| @Override |
| public Set<TServerInstance> onlineTabletServers() { |
| HashSet<TServerInstance> tservers = new HashSet<>(); |
| for (String tserver : client.instanceOperations().getTabletServers()) { |
| try { |
| String zPath = ZooUtil.getRoot(client.instanceOperations().getInstanceID()) |
| + Constants.ZTSERVERS + "/" + tserver; |
| ClientInfo info = getClientInfo(); |
| long sessionId = ZooLock.getSessionId( |
| new ZooCache(info.getZooKeepers(), info.getZooKeepersSessionTimeOut()), zPath); |
| tservers.add(new TServerInstance(tserver, sessionId)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| this.tservers = Collections.unmodifiableSet(tservers); |
| return tservers; |
| } |
| |
| @Override |
| public Set<TableId> onlineTables() { |
| ClientContext context = (ClientContext) client; |
| Set<TableId> onlineTables = Tables.getIdToNameMap(context).keySet(); |
| this.onlineTables = Sets.filter(onlineTables, |
| tableId -> Tables.getTableState(context, tableId) == TableState.ONLINE); |
| return this.onlineTables; |
| } |
| |
| @Override |
| public Collection<MergeInfo> merges() { |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public Set<KeyExtent> migrationsSnapshot() { |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public Set<TServerInstance> shutdownServers() { |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public ManagerState getManagerState() { |
| return ManagerState.NORMAL; |
| } |
| |
| @Override |
| public String toString() { |
| return "tservers: " + tservers + " onlineTables: " + onlineTables; |
| } |
| } |
| |
| } |