blob: 0c251b1b22172f20bdd8a95eef55a2adf673289e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.master.tableOps;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class PopulateMetadataTable extends MasterRepo {
private static final Logger log = LoggerFactory.getLogger(PopulateMetadataTable.class);
private static final long serialVersionUID = 1L;
private ImportedTableInfo tableInfo;
PopulateMetadataTable(ImportedTableInfo ti) {
this.tableInfo = ti;
}
static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt")), UTF_8));
try {
Map<String,String> map = new HashMap<>();
String line = null;
while ((line = in.readLine()) != null) {
String sa[] = line.split(":", 2);
map.put(sa[0], sa[1]);
}
return map;
} finally {
in.close();
}
}
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
BatchWriter mbw = null;
ZipInputStream zis = null;
try {
VolumeManager fs = master.getFileSystem();
mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
zis = new ZipInputStream(fs.open(path));
Map<String,String> fileNameMappings = readMappingFile(fs, tableInfo);
log.info("importDir is " + tableInfo.importDir);
// This is a directory already prefixed with proper volume information e.g. hdfs://localhost:8020/path/to/accumulo/tables/...
final String bulkDir = tableInfo.importDir;
final String[] tableDirs = ServerConstants.getTablesDirs();
ZipEntry zipEntry;
while ((zipEntry = zis.getNextEntry()) != null) {
if (zipEntry.getName().equals(Constants.EXPORT_METADATA_FILE)) {
DataInputStream in = new DataInputStream(new BufferedInputStream(zis));
Key key = new Key();
Value val = new Value();
Mutation m = null;
Text currentRow = null;
int dirCount = 0;
while (true) {
key.readFields(in);
val.readFields(in);
Text endRow = new KeyExtent(key.getRow(), (Text) null).getEndRow();
Text metadataRow = new KeyExtent(tableInfo.tableId, endRow, null).getMetadataEntry();
Text cq;
if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
String oldName = new Path(key.getColumnQualifier().toString()).getName();
String newName = fileNameMappings.get(oldName);
if (newName == null) {
throw new AcceptableThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT,
TableOperationExceptionType.OTHER, "File " + oldName + " does not exist in import dir");
}
cq = new Text(bulkDir + "/" + newName);
} else {
cq = key.getColumnQualifier();
}
if (m == null) {
// Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator
String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8);
// Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir);
m = new Mutation(metadataRow);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8)));
currentRow = metadataRow;
}
if (!currentRow.equals(metadataRow)) {
mbw.addMutation(m);
// Make a unique directory inside the table's dir. Cannot import multiple tables into one table, so don't need to use unique allocator
String tabletDir = new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES), UTF_8);
// Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
String absolutePath = getClonedTabletDir(master, tableDirs, tabletDir);
m = new Mutation(metadataRow);
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(absolutePath.getBytes(UTF_8)));
}
m.put(key.getColumnFamily(), cq, val);
if (endRow == null && TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
mbw.addMutation(m);
break; // its the last column in the last row
}
}
break;
}
}
return new MoveExportedFiles(tableInfo);
} catch (IOException ioe) {
log.warn("{}", ioe.getMessage(), ioe);
throw new AcceptableThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Error reading " + path + " " + ioe.getMessage());
} finally {
if (zis != null) {
try {
zis.close();
} catch (IOException ioe) {
log.warn("Failed to close zip file ", ioe);
}
}
if (mbw != null) {
mbw.close();
}
}
}
/**
* Given options for tables (across multiple volumes), construct an absolute path using the unique name within the chosen volume
*
* @return An absolute, unique path for the imported table
*/
protected String getClonedTabletDir(Master master, String[] tableDirs, String tabletDir) {
// We can try to spread out the tablet dirs across all volumes
String tableDir = master.getFileSystem().choose(Optional.of(tableInfo.tableId), tableDirs);
// Build up a full hdfs://localhost:8020/accumulo/tables/$id/c-XXXXXXX
return tableDir + "/" + tableInfo.tableId + "/" + tabletDir;
}
@Override
public void undo(long tid, Master environment) throws Exception {
MetadataTableUtil.deleteTable(tableInfo.tableId, false, environment, environment.getMasterLock());
}
}