blob: 21f53972ef267e635296ba7b33fa99074a2f79f8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.master.tableOps.tableExport;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TabletFileUtil;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.Utils;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
class WriteExportFiles extends MasterRepo {
private static final long serialVersionUID = 1L;
private final ExportInfo tableInfo;
WriteExportFiles(ExportInfo tableInfo) {
this.tableInfo = tableInfo;
}
private void checkOffline(ClientContext context) throws Exception {
if (Tables.getTableState(context, tableInfo.tableID) != TableState.OFFLINE) {
Tables.clearCache(context);
if (Tables.getTableState(context, tableInfo.tableID) != TableState.OFFLINE) {
throw new AcceptableThriftTableOperationException(tableInfo.tableID.canonical(),
tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
"Table is not offline");
}
}
}
@Override
public long isReady(long tid, Master master) throws Exception {
long reserved = Utils.reserveNamespace(master, tableInfo.namespaceID, tid, false, true,
TableOperation.EXPORT)
+ Utils.reserveTable(master, tableInfo.tableID, tid, false, true, TableOperation.EXPORT);
if (reserved > 0)
return reserved;
AccumuloClient client = master.getContext();
checkOffline(master.getContext());
Scanner metaScanner = client.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
metaScanner.setRange(new KeyExtent(tableInfo.tableID, null, null).toMetadataRange());
// scan for locations
metaScanner.fetchColumnFamily(TabletsSection.CurrentLocationColumnFamily.NAME);
metaScanner.fetchColumnFamily(TabletsSection.FutureLocationColumnFamily.NAME);
if (metaScanner.iterator().hasNext()) {
return 500;
}
// use the same range to check for walogs that we used to check for hosted (or future hosted)
// tablets
// this is done as a separate scan after we check for locations, because walogs are okay only if
// there is no location
metaScanner.clearColumns();
metaScanner.fetchColumnFamily(LogColumnFamily.NAME);
if (metaScanner.iterator().hasNext()) {
throw new AcceptableThriftTableOperationException(tableInfo.tableID.canonical(),
tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
"Write ahead logs found for table");
}
return 0;
}
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
try {
exportTable(master.getFileSystem(), master.getContext(), tableInfo.tableName,
tableInfo.tableID, tableInfo.exportDir);
} catch (IOException ioe) {
throw new AcceptableThriftTableOperationException(tableInfo.tableID.canonical(),
tableInfo.tableName, TableOperation.EXPORT, TableOperationExceptionType.OTHER,
"Failed to create export files " + ioe.getMessage());
}
Utils.unreserveNamespace(master, tableInfo.namespaceID, tid, false);
Utils.unreserveTable(master, tableInfo.tableID, tid, false);
Utils.unreserveHdfsDirectory(master, new Path(tableInfo.exportDir).toString(), tid);
return null;
}
@Override
public void undo(long tid, Master env) {
Utils.unreserveNamespace(env, tableInfo.namespaceID, tid, false);
Utils.unreserveTable(env, tableInfo.tableID, tid, false);
}
public static void exportTable(VolumeManager fs, ServerContext context, String tableName,
TableId tableID, String exportDir) throws Exception {
fs.mkdirs(new Path(exportDir));
Path exportMetaFilePath = fs.getVolumeByPath(new Path(exportDir)).getFileSystem()
.makeQualified(new Path(exportDir, Constants.EXPORT_FILE));
FSDataOutputStream fileOut = fs.create(exportMetaFilePath, false);
ZipOutputStream zipOut = new ZipOutputStream(fileOut);
BufferedOutputStream bufOut = new BufferedOutputStream(zipOut);
DataOutputStream dataOut = new DataOutputStream(bufOut);
try (OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8)) {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_INFO_FILE));
osw.append(ExportTable.EXPORT_VERSION_PROP + ":" + ExportTable.VERSION + "\n");
osw.append("srcInstanceName:" + context.getInstanceName() + "\n");
osw.append("srcInstanceID:" + context.getInstanceID() + "\n");
osw.append("srcZookeepers:" + context.getZooKeepers() + "\n");
osw.append("srcTableName:" + tableName + "\n");
osw.append("srcTableID:" + tableID.canonical() + "\n");
osw.append(ExportTable.DATA_VERSION_PROP + ":" + ServerConstants.DATA_VERSION + "\n");
osw.append("srcCodeVersion:" + Constants.VERSION + "\n");
osw.flush();
dataOut.flush();
exportConfig(context, tableID, zipOut, dataOut);
dataOut.flush();
Map<String,String> uniqueFiles = exportMetadata(fs, context, tableID, zipOut, dataOut);
dataOut.close();
dataOut = null;
createDistcpFile(fs, exportDir, exportMetaFilePath, uniqueFiles);
} finally {
if (dataOut != null)
dataOut.close();
}
}
private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath,
Map<String,String> uniqueFiles) throws IOException {
BufferedWriter distcpOut = new BufferedWriter(
new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false), UTF_8));
try {
for (String file : uniqueFiles.values()) {
distcpOut.append(file);
distcpOut.newLine();
}
distcpOut.append(exportMetaFilePath.toString());
distcpOut.newLine();
distcpOut.close();
distcpOut = null;
} finally {
if (distcpOut != null)
distcpOut.close();
}
}
private static Map<String,String> exportMetadata(VolumeManager fs, ServerContext context,
TableId tableID, ZipOutputStream zipOut, DataOutputStream dataOut)
throws IOException, TableNotFoundException {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
Map<String,String> uniqueFiles = new HashMap<>();
Scanner metaScanner = context.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
metaScanner.fetchColumnFamily(DataFileColumnFamily.NAME);
TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(metaScanner);
TabletsSection.ServerColumnFamily.TIME_COLUMN.fetch(metaScanner);
metaScanner.setRange(new KeyExtent(tableID, null, null).toMetadataRange());
for (Entry<Key,Value> entry : metaScanner) {
entry.getKey().write(dataOut);
entry.getValue().write(dataOut);
if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) {
String path = TabletFileUtil.validate(entry.getKey().getColumnQualifierData().toString());
String[] tokens = path.split("/");
if (tokens.length < 1) {
throw new RuntimeException("Illegal path " + path);
}
String filename = tokens[tokens.length - 1];
String existingPath = uniqueFiles.get(filename);
if (existingPath == null) {
uniqueFiles.put(filename, path);
} else if (!existingPath.equals(path)) {
// make sure file names are unique, should only apply for tables with file names generated
// by Accumulo 1.3 and earlier
throw new IOException("Cannot export table with nonunique file names " + filename
+ ". Major compact table.");
}
}
}
return uniqueFiles;
}
private static void exportConfig(ServerContext context, TableId tableID, ZipOutputStream zipOut,
DataOutputStream dataOut) throws AccumuloException, AccumuloSecurityException, IOException {
DefaultConfiguration defaultConfig = DefaultConfiguration.getInstance();
Map<String,String> siteConfig = context.instanceOperations().getSiteConfiguration();
Map<String,String> systemConfig = context.instanceOperations().getSystemConfiguration();
TableConfiguration tableConfig = context.getServerConfFactory().getTableConfiguration(tableID);
OutputStreamWriter osw = new OutputStreamWriter(dataOut, UTF_8);
// only put props that are different than defaults and higher level configurations
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_TABLE_CONFIG_FILE));
for (Entry<String,String> prop : tableConfig) {
if (prop.getKey().startsWith(Property.TABLE_PREFIX.getKey())) {
Property key = Property.getPropertyByKey(prop.getKey());
if (key == null || !defaultConfig.get(key).equals(prop.getValue())) {
if (!prop.getValue().equals(siteConfig.get(prop.getKey()))
&& !prop.getValue().equals(systemConfig.get(prop.getKey()))) {
osw.append(prop.getKey() + "=" + prop.getValue() + "\n");
}
}
}
}
osw.flush();
}
}