blob: cd8639080dc6659e44951d446aa8ec61dd597022 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.lang.reflect.Type;
import java.util.AbstractMap;
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Base64.Encoder;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.impl.Bulk.Files;
import org.apache.accumulo.core.client.impl.Bulk.Mapping;
import org.apache.accumulo.core.client.impl.Table.ID;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.hadoop.fs.Path;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonPrimitive;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
/**
* Place for all bulk import serialization code. For the objects being serialized see {@link Bulk}
*/
public class BulkSerialize {
private static class ByteArrayToBase64TypeAdapter
implements JsonSerializer<byte[]>, JsonDeserializer<byte[]> {
Decoder decoder = Base64.getUrlDecoder();
Encoder encoder = Base64.getUrlEncoder();
@Override
public byte[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
return decoder.decode(json.getAsString());
}
@Override
public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
return new JsonPrimitive(encoder.encodeToString(src));
}
}
private static Gson createGson() {
return new GsonBuilder()
.registerTypeHierarchyAdapter(byte[].class, new ByteArrayToBase64TypeAdapter()).create();
}
public interface Output {
OutputStream create(Path path) throws IOException;
}
public interface Input {
InputStream open(Path path) throws IOException;
}
/**
* Serialize bulk load mapping to {@value Constants#BULK_LOAD_MAPPING}
*/
public static void writeLoadMapping(SortedMap<KeyExtent,Bulk.Files> loadMapping, String sourceDir,
Output output) throws IOException {
final Path lmFile = new Path(sourceDir, Constants.BULK_LOAD_MAPPING);
try (OutputStream fsOut = output.create(lmFile);
JsonWriter writer = new JsonWriter(
new BufferedWriter(new OutputStreamWriter(fsOut, UTF_8)))) {
Gson gson = createGson();
writer.setIndent(" ");
writer.beginArray();
Set<Entry<KeyExtent,Files>> es = loadMapping.entrySet();
for (Entry<KeyExtent,Files> entry : es) {
Mapping mapping = new Bulk.Mapping(entry.getKey(), entry.getValue());
gson.toJson(mapping, Mapping.class, writer);
}
writer.endArray();
}
}
public static class LoadMappingIterator
implements Iterator<Entry<KeyExtent,Bulk.Files>>, AutoCloseable {
private ID tableId;
private JsonReader reader;
private Gson gson = createGson();
private Map<String,String> renameMap;
private LoadMappingIterator(Table.ID tableId, JsonReader reader) {
this.tableId = tableId;
this.reader = reader;
}
private void setRenameMap(Map<String,String> renameMap) {
this.renameMap = renameMap;
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public boolean hasNext() {
try {
return reader.hasNext();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public Entry<KeyExtent,Files> next() {
Bulk.Mapping bm = gson.fromJson(reader, Bulk.Mapping.class);
if (renameMap != null) {
return new AbstractMap.SimpleEntry<>(bm.getKeyExtent(tableId),
bm.getFiles().mapNames(renameMap));
} else {
return new AbstractMap.SimpleEntry<>(bm.getKeyExtent(tableId), bm.getFiles());
}
}
}
/**
* Read Json array of Bulk.Mapping into LoadMappingIterator
*/
public static LoadMappingIterator readLoadMapping(String bulkDir, Table.ID tableId, Input input)
throws IOException {
final Path lmFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);
JsonReader reader = new JsonReader(
new BufferedReader(new InputStreamReader(input.open(lmFile), UTF_8)));
reader.beginArray();
return new LoadMappingIterator(tableId, reader);
}
public static void writeRenameMap(Map<String,String> oldToNewNameMap, String bulkDir,
Output output) throws IOException {
final Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
try (OutputStream fsOut = output.create(renamingFile);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fsOut))) {
Gson gson = new GsonBuilder().create();
gson.toJson(oldToNewNameMap, writer);
}
}
public static Map<String,String> readRenameMap(String bulkDir, Input input) throws IOException {
final Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
Map<String,String> oldToNewNameMap;
Gson gson = createGson();
try (InputStream fis = input.open(renamingFile);
BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) {
oldToNewNameMap = gson.fromJson(reader, new TypeToken<Map<String,String>>() {}.getType());
}
return oldToNewNameMap;
}
/**
* Read in both maps and change all the file names in the mapping to the new names. This is needed
* because the load mapping file was written with the original file names before they were moved
* by BulkImportMove
*/
public static LoadMappingIterator getUpdatedLoadMapping(String bulkDir, Table.ID tableId,
Input input) throws IOException {
Map<String,String> renames = readRenameMap(bulkDir, input);
LoadMappingIterator lmi = readLoadMapping(bulkDir, tableId, input);
lmi.setRenameMap(renames);
return lmi;
}
}