blob: b38083f7df76a1f7f2e3755b32b00273ac1c4552 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.util;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.impl.BatchWriterImpl;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Credentials;
import org.apache.accumulo.core.client.impl.ScannerImpl;
import org.apache.accumulo.core.client.impl.Writer;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
/**
* provides a reference to the metadata table for updates by tablet servers
*/
public class MetadataTableUtil {
private static final Text EMPTY_TEXT = new Text();
private static final byte[] EMPTY_BYTES = new byte[0];
private static Map<Credentials,Writer> root_tables = new HashMap<>();
private static Map<Credentials,Writer> metadata_tables = new HashMap<>();
private static final Logger log = LoggerFactory.getLogger(MetadataTableUtil.class);
private MetadataTableUtil() {}
public synchronized static Writer getMetadataTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer metadataTable = metadata_tables.get(credentials);
if (metadataTable == null) {
metadataTable = new Writer(context, MetadataTable.ID);
metadata_tables.put(credentials, metadataTable);
}
return metadataTable;
}
public synchronized static Writer getRootTable(ClientContext context) {
Credentials credentials = context.getCredentials();
Writer rootTable = root_tables.get(credentials);
if (rootTable == null) {
rootTable = new Writer(context, RootTable.ID);
root_tables.put(credentials, rootTable);
}
return rootTable;
}
public static void putLockID(ZooLock zooLock, Mutation m) {
TabletsSection.ServerColumnFamily.LOCK_COLUMN.put(m, new Value(zooLock.getLockID().serialize(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + "/")
.getBytes(UTF_8)));
}
private static void update(ClientContext context, Mutation m, KeyExtent extent) {
update(context, null, m, extent);
}
public static void update(ClientContext context, ZooLock zooLock, Mutation m, KeyExtent extent) {
Writer t = extent.isMeta() ? getRootTable(context) : getMetadataTable(context);
update(t, zooLock, m);
}
public static void update(Writer t, ZooLock zooLock, Mutation m) {
if (zooLock != null)
putLockID(zooLock, m);
while (true) {
try {
t.update(m);
return;
} catch (AccumuloException e) {
log.error("{}", e.getMessage(), e);
} catch (AccumuloSecurityException e) {
log.error("{}", e.getMessage(), e);
} catch (ConstraintViolationException e) {
log.error("{}", e.getMessage(), e);
// retrying when a CVE occurs is probably futile and can cause problems, see ACCUMULO-3096
throw new RuntimeException(e);
} catch (TableNotFoundException e) {
log.error("{}", e.getMessage(), e);
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
public static void updateTabletFlushID(KeyExtent extent, long flushID, ClientContext context, ZooLock zooLock) {
if (!extent.isRootTablet()) {
Mutation m = new Mutation(extent.getMetadataEntry());
TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value((flushID + "").getBytes(UTF_8)));
update(context, zooLock, m, extent);
}
}
public static void updateTabletCompactID(KeyExtent extent, long compactID, ClientContext context, ZooLock zooLock) {
if (!extent.isRootTablet()) {
Mutation m = new Mutation(extent.getMetadataEntry());
TabletsSection.ServerColumnFamily.COMPACT_COLUMN.put(m, new Value((compactID + "").getBytes(UTF_8)));
update(context, zooLock, m, extent);
}
}
public static void updateTabletDataFile(long tid, KeyExtent extent, Map<FileRef,DataFileValue> estSizes, String time, ClientContext context, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
byte[] tidBytes = Long.toString(tid).getBytes(UTF_8);
for (Entry<FileRef,DataFileValue> entry : estSizes.entrySet()) {
Text file = entry.getKey().meta();
m.put(DataFileColumnFamily.NAME, file, new Value(entry.getValue().encode()));
m.put(TabletsSection.BulkFileColumnFamily.NAME, file, new Value(tidBytes));
}
TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value(time.getBytes(UTF_8)));
update(context, zooLock, m, extent);
}
public static void updateTabletDir(KeyExtent extent, String newDir, ClientContext context, ZooLock lock) {
Mutation m = new Mutation(extent.getMetadataEntry());
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(newDir.getBytes(UTF_8)));
update(context, lock, m, extent);
}
public static void addTablet(KeyExtent extent, String path, ClientContext context, char timeType, ZooLock lock) {
Mutation m = extent.getPrevRowUpdateMutation();
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(path.getBytes(UTF_8)));
TabletsSection.ServerColumnFamily.TIME_COLUMN.put(m, new Value((timeType + "0").getBytes(UTF_8)));
update(context, lock, m, extent);
}
public static void updateTabletVolumes(KeyExtent extent, List<LogEntry> logsToRemove, List<LogEntry> logsToAdd, List<FileRef> filesToRemove,
SortedMap<FileRef,DataFileValue> filesToAdd, String newDir, ZooLock zooLock, AccumuloServerContext context) {
if (extent.isRootTablet()) {
if (newDir != null)
throw new IllegalArgumentException("newDir not expected for " + extent);
if (filesToRemove.size() != 0 || filesToAdd.size() != 0)
throw new IllegalArgumentException("files not expected for " + extent);
// add before removing in case of process death
for (LogEntry logEntry : logsToAdd)
addRootLogEntry(context, zooLock, logEntry);
removeUnusedWALEntries(context, extent, logsToRemove, zooLock);
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
for (LogEntry logEntry : logsToRemove)
m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier());
for (LogEntry logEntry : logsToAdd)
m.put(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.getValue());
for (FileRef fileRef : filesToRemove)
m.putDelete(DataFileColumnFamily.NAME, fileRef.meta());
for (Entry<FileRef,DataFileValue> entry : filesToAdd.entrySet())
m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
if (newDir != null)
ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(newDir.getBytes(UTF_8)));
update(context, m, extent);
}
}
private static interface ZooOperation {
void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException;
}
private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) {
while (true) {
try {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
if (zoo.isLockHeld(zooLock.getLockID())) {
op.run(zoo);
}
break;
} catch (Exception e) {
log.error("Unexpected exception {}", e.getMessage(), e);
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) {
retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
@Override
public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
String root = getZookeeperLogLocation();
rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE);
}
});
}
public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException {
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<>();
try (Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
Text row = extent.getMetadataEntry();
VolumeManager fs = VolumeManagerImpl.get();
Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text(""));
endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
mdScanner.setRange(new Range(new Key(row), endKey));
for (Entry<Key,Value> entry : mdScanner) {
if (!entry.getKey().getRow().equals(row))
break;
DataFileValue dfv = new DataFileValue(entry.getValue().get());
sizes.put(new FileRef(fs, entry.getKey()), dfv);
}
return sizes;
}
}
public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ClientContext context, ZooLock zooLock) {
KeyExtent ke = new KeyExtent(metadataEntry, oldPrevEndRow);
Mutation m = ke.getPrevRowUpdateMutation();
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.putDelete(m);
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.putDelete(m);
update(context, zooLock, m, new KeyExtent(metadataEntry, (Text) null));
}
public static void splitTablet(KeyExtent extent, Text oldPrevEndRow, double splitRatio, ClientContext context, ZooLock zooLock) {
Mutation m = extent.getPrevRowUpdateMutation(); //
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value(Double.toString(splitRatio).getBytes(UTF_8)));
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.put(m, KeyExtent.encodePrevEndRow(oldPrevEndRow));
ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
update(context, zooLock, m, extent);
}
public static void finishSplit(Text metadataEntry, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove,
final ClientContext context, ZooLock zooLock) {
Mutation m = new Mutation(metadataEntry);
TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.putDelete(m);
TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.putDelete(m);
ChoppedColumnFamily.CHOPPED_COLUMN.putDelete(m);
for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
m.put(DataFileColumnFamily.NAME, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
for (FileRef pathToRemove : highDatafilesToRemove) {
m.putDelete(DataFileColumnFamily.NAME, pathToRemove.meta());
}
update(context, zooLock, m, new KeyExtent(metadataEntry, (Text) null));
}
public static void finishSplit(KeyExtent extent, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, ClientContext context,
ZooLock zooLock) {
finishSplit(extent.getMetadataEntry(), datafileSizes, highDatafilesToRemove, context, zooLock);
}
public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, ClientContext context) throws IOException {
String tableId = extent.getTableId();
// TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294
for (FileRef pathToRemove : datafilesToDelete) {
update(context, createDeleteMutation(tableId, pathToRemove.path().toString()), extent);
}
}
public static void addDeleteEntry(AccumuloServerContext context, String tableId, String path) throws IOException {
update(context, createDeleteMutation(tableId, path), new KeyExtent(tableId, null, null));
}
public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException {
Path path = VolumeManagerImpl.get().getFullPath(tableId, pathToRemove);
Mutation delFlag = new Mutation(new Text(MetadataSchema.DeletesSection.getRowPrefix() + path.toString()));
delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
return delFlag;
}
public static void removeScanFiles(KeyExtent extent, Set<FileRef> scanFiles, ClientContext context, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
for (FileRef pathToRemove : scanFiles)
m.putDelete(ScanFileColumnFamily.NAME, pathToRemove.meta());
update(context, zooLock, m, extent);
}
public static void splitDatafiles(String tableId, Text midRow, double splitRatio, Map<FileRef,FileUtil.FileInfo> firstAndLastRows,
SortedMap<FileRef,DataFileValue> datafiles, SortedMap<FileRef,DataFileValue> lowDatafileSizes, SortedMap<FileRef,DataFileValue> highDatafileSizes,
List<FileRef> highDatafilesToRemove) {
for (Entry<FileRef,DataFileValue> entry : datafiles.entrySet()) {
Text firstRow = null;
Text lastRow = null;
boolean rowsKnown = false;
FileUtil.FileInfo mfi = firstAndLastRows.get(entry.getKey());
if (mfi != null) {
firstRow = mfi.getFirstRow();
lastRow = mfi.getLastRow();
rowsKnown = true;
}
if (rowsKnown && firstRow.compareTo(midRow) > 0) {
// only in high
long highSize = entry.getValue().getSize();
long highEntries = entry.getValue().getNumEntries();
highDatafileSizes.put(entry.getKey(), new DataFileValue(highSize, highEntries, entry.getValue().getTime()));
} else if (rowsKnown && lastRow.compareTo(midRow) <= 0) {
// only in low
long lowSize = entry.getValue().getSize();
long lowEntries = entry.getValue().getNumEntries();
lowDatafileSizes.put(entry.getKey(), new DataFileValue(lowSize, lowEntries, entry.getValue().getTime()));
highDatafilesToRemove.add(entry.getKey());
} else {
long lowSize = (long) Math.floor((entry.getValue().getSize() * splitRatio));
long lowEntries = (long) Math.floor((entry.getValue().getNumEntries() * splitRatio));
lowDatafileSizes.put(entry.getKey(), new DataFileValue(lowSize, lowEntries, entry.getValue().getTime()));
long highSize = (long) Math.ceil((entry.getValue().getSize() * (1.0 - splitRatio)));
long highEntries = (long) Math.ceil((entry.getValue().getNumEntries() * (1.0 - splitRatio)));
highDatafileSizes.put(entry.getKey(), new DataFileValue(highSize, highEntries, entry.getValue().getTime()));
}
}
}
public static void deleteTable(String tableId, boolean insertDeletes, ClientContext context, ZooLock lock) throws AccumuloException, IOException {
try (Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY);
BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000)
.setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2))) {
// scan metadata for our table and delete everything we find
Mutation m = null;
ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
// insert deletes before deleting data from metadata... this makes the code fault tolerant
if (insertDeletes) {
ms.fetchColumnFamily(DataFileColumnFamily.NAME);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms);
for (Entry<Key,Value> cell : ms) {
Key key = cell.getKey();
if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
}
if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString()));
}
}
bw.flush();
ms.clearColumns();
}
for (Entry<Key,Value> cell : ms) {
Key key = cell.getKey();
if (m == null) {
m = new Mutation(key.getRow());
if (lock != null)
putLockID(lock, m);
}
if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) {
bw.addMutation(m);
m = new Mutation(key.getRow());
if (lock != null)
putLockID(lock, m);
}
m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
}
if (m != null)
bw.addMutation(m);
}
}
static String getZookeeperLogLocation() {
return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
}
public static void setRootTabletDir(String dir) throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
try {
zoo.putPersistentData(zpath, dir.getBytes(UTF_8), -1, NodeExistsPolicy.OVERWRITE);
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
public static String getRootTabletDir() throws IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH;
try {
return new String(zoo.getData(zpath, null), UTF_8);
} catch (KeeperException e) {
throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
public static Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>> getFileAndLogEntries(ClientContext context, KeyExtent extent) throws KeeperException,
InterruptedException, IOException {
ArrayList<LogEntry> result = new ArrayList<>();
TreeMap<FileRef,DataFileValue> sizes = new TreeMap<>();
VolumeManager fs = VolumeManagerImpl.get();
if (extent.isRootTablet()) {
getRootLogEntries(result);
Path rootDir = new Path(getRootTabletDir());
FileStatus[] files = fs.listStatus(rootDir);
for (FileStatus fileStatus : files) {
if (fileStatus.getPath().toString().endsWith("_tmp")) {
continue;
}
DataFileValue dfv = new DataFileValue(0, 0);
sizes.put(new FileRef(fileStatus.getPath().toString(), fileStatus.getPath()), dfv);
}
} else {
String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID;
try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) {
scanner.fetchColumnFamily(LogColumnFamily.NAME);
scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
scanner.setRange(extent.toMetadataRange());
for (Entry<Key,Value> entry : scanner) {
if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) {
throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry());
}
if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
} else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
DataFileValue dfv = new DataFileValue(entry.getValue().get());
sizes.put(new FileRef(fs, entry.getKey()), dfv);
} else {
throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
}
}
}
}
return new Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>>(result, sizes);
}
public static List<LogEntry> getLogEntries(ClientContext context, KeyExtent extent) throws IOException, KeeperException, InterruptedException {
log.info("Scanning logging entries for " + extent);
ArrayList<LogEntry> result = new ArrayList<>();
if (extent.equals(RootTable.EXTENT)) {
log.info("Getting logs for root tablet from zookeeper");
getRootLogEntries(result);
} else {
log.info("Scanning metadata for logs used for tablet " + extent);
Scanner scanner = getTabletLogScanner(context, extent);
Text pattern = extent.getMetadataEntry();
for (Entry<Key,Value> entry : scanner) {
Text row = entry.getKey().getRow();
if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) {
if (row.equals(pattern)) {
result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue()));
}
}
}
}
log.info("Returning logs " + result + " for extent " + extent);
return result;
}
static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
String root = getZookeeperLogLocation();
// there's a little race between getting the children and fetching
// the data. The log can be removed in between.
while (true) {
result.clear();
for (String child : zoo.getChildren(root)) {
try {
LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null));
// upgrade from !0;!0<< -> +r<<
e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename);
result.add(e);
} catch (KeeperException.NoNodeException ex) {
continue;
}
}
break;
}
}
private static Scanner getTabletLogScanner(ClientContext context, KeyExtent extent) {
String tableId = MetadataTable.ID;
if (extent.isMeta())
tableId = RootTable.ID;
Scanner scanner = new ScannerImpl(context, tableId, Authorizations.EMPTY);
scanner.fetchColumnFamily(LogColumnFamily.NAME);
Text start = extent.getMetadataEntry();
Key endKey = new Key(start, LogColumnFamily.NAME);
endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
scanner.setRange(new Range(new Key(start), endKey));
return scanner;
}
private static class LogEntryIterator implements Iterator<LogEntry> {
Iterator<LogEntry> zookeeperEntries = null;
Iterator<LogEntry> rootTableEntries = null;
Iterator<Entry<Key,Value>> metadataEntries = null;
LogEntryIterator(ClientContext context) throws IOException, KeeperException, InterruptedException {
zookeeperEntries = getLogEntries(context, RootTable.EXTENT).iterator();
rootTableEntries = getLogEntries(context, new KeyExtent(MetadataTable.ID, null, null)).iterator();
try {
Scanner scanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
log.info("Setting range to " + MetadataSchema.TabletsSection.getRange());
scanner.setRange(MetadataSchema.TabletsSection.getRange());
scanner.fetchColumnFamily(LogColumnFamily.NAME);
metadataEntries = scanner.iterator();
} catch (Exception ex) {
throw new IOException(ex);
}
}
@Override
public boolean hasNext() {
return zookeeperEntries.hasNext() || rootTableEntries.hasNext() || metadataEntries.hasNext();
}
@Override
public LogEntry next() {
if (zookeeperEntries.hasNext()) {
return zookeeperEntries.next();
}
if (rootTableEntries.hasNext()) {
return rootTableEntries.next();
}
Entry<Key,Value> entry = metadataEntries.next();
return LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public static Iterator<LogEntry> getLogEntries(ClientContext context) throws IOException, KeeperException, InterruptedException {
return new LogEntryIterator(context);
}
public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) {
if (extent.isRootTablet()) {
retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
@Override
public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
String root = getZookeeperLogLocation();
for (LogEntry entry : entries) {
String path = root + "/" + entry.getUniqueID();
log.debug("Removing " + path + " from zookeeper");
rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
}
}
});
} else {
Mutation m = new Mutation(extent.getMetadataEntry());
for (LogEntry entry : entries) {
m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier());
}
update(context, zooLock, m, extent);
}
}
private static void getFiles(Set<String> files, Map<Key,Value> tablet, String srcTableId) {
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
String cf = entry.getKey().getColumnQualifier().toString();
if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
}
files.add(cf);
}
}
}
private static Mutation createCloneMutation(String srcTableId, String tableId, Map<Key,Value> tablet) {
KeyExtent ke = new KeyExtent(tablet.keySet().iterator().next().getRow(), (Text) null);
Mutation m = new Mutation(KeyExtent.getMetadataEntry(tableId, ke.getEndRow()));
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
String cf = entry.getKey().getColumnQualifier().toString();
if (!cf.startsWith("../") && !cf.contains(":"))
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(TabletsSection.CurrentLocationColumnFamily.NAME)) {
m.put(TabletsSection.LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(TabletsSection.LastLocationColumnFamily.NAME)) {
// skip
} else {
m.put(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), entry.getValue());
}
}
return m;
}
private static Scanner createCloneScanner(String tableName, String tableId, Connector conn) throws TableNotFoundException {
Scanner mscanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
mscanner.fetchColumnFamily(DataFileColumnFamily.NAME);
mscanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
mscanner.fetchColumnFamily(TabletsSection.LastLocationColumnFamily.NAME);
mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(mscanner);
TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(mscanner);
return mscanner;
}
@VisibleForTesting
public static void initializeClone(String tableName, String srcTableId, String tableId, Connector conn, BatchWriter bw) throws TableNotFoundException,
MutationsRejectedException {
TabletIterator ti;
if (srcTableId.equals(MetadataTable.ID))
ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), new Range(), true, true);
else
ti = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), new KeyExtent(srcTableId, null, null).toMetadataRange(), true, true);
if (!ti.hasNext())
throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId);
while (ti.hasNext())
bw.addMutation(createCloneMutation(srcTableId, tableId, ti.next()));
bw.flush();
}
private static int compareEndRows(Text endRow1, Text endRow2) {
return new KeyExtent("0", endRow1, null).compareTo(new KeyExtent("0", endRow2, null));
}
@VisibleForTesting
public static int checkClone(String tableName, String srcTableId, String tableId, Connector conn, BatchWriter bw) throws TableNotFoundException,
MutationsRejectedException {
TabletIterator srcIter = new TabletIterator(createCloneScanner(tableName, srcTableId, conn), new KeyExtent(srcTableId, null, null).toMetadataRange(), true,
true);
TabletIterator cloneIter = new TabletIterator(createCloneScanner(tableName, tableId, conn), new KeyExtent(tableId, null, null).toMetadataRange(), true,
true);
if (!cloneIter.hasNext() || !srcIter.hasNext())
throw new RuntimeException(" table deleted during clone? srcTableId = " + srcTableId + " tableId=" + tableId);
int rewrites = 0;
while (cloneIter.hasNext()) {
Map<Key,Value> cloneTablet = cloneIter.next();
Text cloneEndRow = new KeyExtent(cloneTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
HashSet<String> cloneFiles = new HashSet<>();
boolean cloneSuccessful = false;
for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(ClonedColumnFamily.NAME)) {
cloneSuccessful = true;
break;
}
}
if (!cloneSuccessful)
getFiles(cloneFiles, cloneTablet, null);
List<Map<Key,Value>> srcTablets = new ArrayList<>();
Map<Key,Value> srcTablet = srcIter.next();
srcTablets.add(srcTablet);
Text srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
int cmp = compareEndRows(cloneEndRow, srcEndRow);
if (cmp < 0)
throw new TabletIterator.TabletDeletedException("Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
HashSet<String> srcFiles = new HashSet<>();
if (!cloneSuccessful)
getFiles(srcFiles, srcTablet, srcTableId);
while (cmp > 0) {
srcTablet = srcIter.next();
srcTablets.add(srcTablet);
srcEndRow = new KeyExtent(srcTablet.keySet().iterator().next().getRow(), (Text) null).getEndRow();
cmp = compareEndRows(cloneEndRow, srcEndRow);
if (cmp < 0)
throw new TabletIterator.TabletDeletedException("Tablets deleted from src during clone : " + cloneEndRow + " " + srcEndRow);
if (!cloneSuccessful)
getFiles(srcFiles, srcTablet, srcTableId);
}
if (cloneSuccessful)
continue;
if (!srcFiles.containsAll(cloneFiles)) {
// delete existing cloned tablet entry
Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
for (Entry<Key,Value> entry : cloneTablet.entrySet()) {
Key k = entry.getKey();
m.putDelete(k.getColumnFamily(), k.getColumnQualifier(), k.getTimestamp());
}
bw.addMutation(m);
for (Map<Key,Value> st : srcTablets)
bw.addMutation(createCloneMutation(srcTableId, tableId, st));
rewrites++;
} else {
// write out marker that this tablet was successfully cloned
Mutation m = new Mutation(cloneTablet.keySet().iterator().next().getRow());
m.put(ClonedColumnFamily.NAME, new Text(""), new Value("OK".getBytes(UTF_8)));
bw.addMutation(m);
}
}
bw.flush();
return rewrites;
}
public static void cloneTable(ClientContext context, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception {
Connector conn = context.getConnector();
try (BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
while (true) {
try {
initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
// the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed
while (true) {
int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw);
if (rewrites == 0)
break;
}
bw.flush();
break;
} catch (TabletIterator.TabletDeletedException tde) {
// tablets were merged in the src table
bw.flush();
// delete what we have cloned and try again
deleteTable(tableId, false, context, null);
log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again");
sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
// delete the clone markers and create directory entries
Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
mscanner.fetchColumnFamily(ClonedColumnFamily.NAME);
int dirCount = 0;
for (Entry<Key,Value> entry : mscanner) {
Key k = entry.getKey();
Mutation m = new Mutation(k.getRow());
m.putDelete(k.getColumnFamily(), k.getColumnQualifier());
String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId
+ Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES));
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8)));
bw.addMutation(m);
}
}
}
public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
ChoppedColumnFamily.CHOPPED_COLUMN.put(m, new Value("chopped".getBytes(UTF_8)));
update(context, zooLock, m, extent);
}
public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception {
try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY));
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) {
mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : mscanner) {
log.debug("Looking at entry " + entry + " with tid " + tid);
if (Long.parseLong(entry.getValue().toString()) == tid) {
log.debug("deleting entry " + entry);
Mutation m = new Mutation(entry.getKey().getRow());
m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
bw.addMutation(m);
}
}
}
}
public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
List<FileRef> result = new ArrayList<>();
try (Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) {
VolumeManager fs = VolumeManagerImpl.get();
mscanner.setRange(extent.toMetadataRange());
mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
result.add(new FileRef(fs, entry.getKey()));
}
}
return result;
} catch (TableNotFoundException ex) {
// unlikely
throw new RuntimeException("Onos! teh metadata table has vanished!!");
}
}
public static Map<Long,? extends Collection<FileRef>> getBulkFilesLoaded(ClientContext context, KeyExtent extent) throws IOException {
Text metadataRow = extent.getMetadataEntry();
Map<Long,List<FileRef>> result = new HashMap<>();
VolumeManager fs = VolumeManagerImpl.get();
try (Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) {
scanner.setRange(new Range(metadataRow));
scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME);
for (Entry<Key,Value> entry : scanner) {
Long tid = Long.parseLong(entry.getValue().toString());
List<FileRef> lst = result.get(tid);
if (lst == null) {
result.put(tid, lst = new ArrayList<>());
}
lst.add(new FileRef(fs, entry.getKey()));
}
}
return result;
}
public static void addBulkLoadInProgressFlag(AccumuloServerContext context, String path) {
Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path);
m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
// new KeyExtent is only added to force update to write to the metadata table, not the root table
// because bulk loads aren't supported to the metadata table
update(context, m, new KeyExtent("anythingNotMetadata", null, null));
}
public static void removeBulkLoadInProgressFlag(AccumuloServerContext context, String path) {
Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path);
m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
// new KeyExtent is only added to force update to write to the metadata table, not the root table
// because bulk loads aren't supported to the metadata table
update(context, m, new KeyExtent("anythingNotMetadata", null, null));
}
/**
* During an upgrade from 1.6 to 1.7, we need to add the replication table
*/
public static void createReplicationTable(ClientContext context) throws IOException {
String dir = VolumeManagerImpl.get().choose(Optional.of(ReplicationTable.ID), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ ReplicationTable.ID + Constants.DEFAULT_TABLET_LOCATION;
Mutation m = new Mutation(new Text(KeyExtent.getMetadataEntry(ReplicationTable.ID, null)));
m.put(DIRECTORY_COLUMN.getColumnFamily(), DIRECTORY_COLUMN.getColumnQualifier(), 0, new Value(dir.getBytes(UTF_8)));
m.put(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier(), 0, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8)));
m.put(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier(), 0, KeyExtent.encodePrevEndRow(null));
update(getMetadataTable(context), null, m);
}
/**
* During an upgrade we need to move deletion requests for files under the !METADATA table to the root tablet.
*/
public static void moveMetaDeleteMarkers(ClientContext context) {
String oldDeletesPrefix = "!!~del";
Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false);
// move old delete markers to new location, to standardize table schema between all metadata tables
try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) {
scanner.setRange(oldDeletesRange);
for (Entry<Key,Value> entry : scanner) {
String row = entry.getKey().getRow().toString();
if (row.startsWith(oldDeletesPrefix)) {
moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix);
} else {
break;
}
}
}
}
public static void moveMetaDeleteMarkersFrom14(ClientContext context) {
// new KeyExtent is only added to force update to write to the metadata table, not the root table
KeyExtent notMetadata = new KeyExtent("anythingNotMetadata", null, null);
// move delete markers from the normal delete keyspace to the root tablet delete keyspace if the files are for the !METADATA table
try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) {
scanner.setRange(MetadataSchema.DeletesSection.getRange());
for (Entry<Key,Value> entry : scanner) {
String row = entry.getKey().getRow().toString();
if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) {
moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix());
} else {
break;
}
}
}
}
private static void moveDeleteEntry(ClientContext context, KeyExtent oldExtent, Entry<Key,Value> entry, String rowID, String prefix) {
String filename = rowID.substring(prefix.length());
// add the new entry first
log.info("Moving " + filename + " marker in " + RootTable.NAME);
Mutation m = new Mutation(MetadataSchema.DeletesSection.getRowPrefix() + filename);
m.put(EMPTY_BYTES, EMPTY_BYTES, EMPTY_BYTES);
update(context, m, RootTable.EXTENT);
// then remove the old entry
m = new Mutation(entry.getKey().getRow());
m.putDelete(EMPTY_BYTES, EMPTY_BYTES);
update(context, m, oldExtent);
}
public static SortedMap<Text,SortedMap<ColumnFQ,Value>> getTabletEntries(SortedMap<Key,Value> tabletKeyValues, List<ColumnFQ> columns) {
TreeMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries = new TreeMap<>();
HashSet<ColumnFQ> colSet = null;
if (columns != null) {
colSet = new HashSet<>(columns);
}
for (Entry<Key,Value> entry : tabletKeyValues.entrySet()) {
if (columns != null && !colSet.contains(new ColumnFQ(entry.getKey()))) {
continue;
}
Text row = entry.getKey().getRow();
SortedMap<ColumnFQ,Value> colVals = tabletEntries.get(row);
if (colVals == null) {
colVals = new TreeMap<>();
tabletEntries.put(row, colVals);
}
colVals.put(new ColumnFQ(entry.getKey()), entry.getValue());
}
return tabletEntries;
}
}