blob: ce24ff2351598e3e1ac16e3790c9e989c33cfc6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.snapshot;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.snapshot.SnapshotIterator;
import org.apache.geode.internal.ExitCode;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.snapshot.SnapshotPacket.SnapshotRecord;
import org.apache.geode.pdx.internal.EnumInfo;
import org.apache.geode.pdx.internal.PdxType;
import org.apache.geode.pdx.internal.TypeRegistry;
/**
* Provides support for reading and writing snapshot files.
*/
public class GFSnapshot {
/**
* Writes cache entries to the snapshot.
*/
public interface SnapshotWriter {
/**
* Appends a cache entry to the snapshot.
*
* @param entry the cache entry
*/
void snapshotEntry(SnapshotRecord entry) throws IOException;
/**
* Invoked to indicate that all entries have been written to the snapshot.
*/
void snapshotComplete() throws IOException;
}
/** the snapshot format version 1 */
public static final int SNAP_VER_1 = 1;
/** the snapshot format version 2 */
public static final int SNAP_VER_2 = 2;
/** the snapshot file format */
@Immutable
private static final byte[] SNAP_FMT = {0x47, 0x46, 0x53};
private GFSnapshot() {}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: GFSnapshot <file>");
ExitCode.FATAL.doSystemExit();
}
GFSnapshotImporter imp = new GFSnapshotImporter(new File(args[0]), null);
try {
System.out.println("Snapshot format is version " + imp.getVersion());
System.out.println("Snapshot region is " + imp.getRegionName());
ExportedRegistry reg = imp.getPdxTypes();
Map<Integer, PdxType> types = reg.types();
System.out.println("Found " + types.size() + " PDX types:");
for (Entry<Integer, PdxType> entry : types.entrySet()) {
System.out.println("\t" + entry.getKey() + " = " + entry.getValue());
}
Map<Integer, EnumInfo> enums = reg.enums();
System.out.println("Found " + enums.size() + " PDX enums: ");
for (Entry<Integer, EnumInfo> entry : enums.entrySet()) {
System.out.println("\t" + entry.getKey() + " = " + entry.getValue());
}
System.out.println();
SnapshotRecord record;
while ((record = imp.readSnapshotRecord()) != null) {
System.out.println(record.getKeyObject() + " = " + record.getValueObject());
}
} finally {
imp.close();
}
}
/**
* Creates a snapshot file and provides a serializer to write entries to the snapshot.
*
* @param snapshot the snapshot file
* @param region the region name
* @return the callback to allow the invoker to provide the snapshot entries
* @throws IOException error writing the snapshot file
*/
public static SnapshotWriter create(File snapshot, String region, InternalCache cache)
throws IOException {
final GFSnapshotExporter out = new GFSnapshotExporter(snapshot, region, cache);
return new SnapshotWriter() {
@Override
public void snapshotEntry(SnapshotRecord entry) throws IOException {
out.writeSnapshotEntry(entry);
}
@Override
public void snapshotComplete() throws IOException {
out.close();
}
};
}
/**
* Reads a snapshot file.
*
* @param <K> the key type
* @param <V> the value type
* @param snapshot the snapshot file
* @return the snapshot iterator
*
* @throws IOException error reading the snapshot file
* @throws ClassNotFoundException unable to deserialize entry
*/
public static <K, V> SnapshotIterator<K, V> read(final File snapshot, TypeRegistry typeRegistry)
throws IOException, ClassNotFoundException {
return new SnapshotIterator<K, V>() {
GFSnapshotImporter in = new GFSnapshotImporter(snapshot, typeRegistry);
private boolean foundNext;
private Entry<K, V> next;
@Override
public boolean hasNext() throws IOException, ClassNotFoundException {
if (!foundNext) {
return moveNext();
}
return true;
}
@Override
public Entry<K, V> next() throws IOException, ClassNotFoundException {
if (!foundNext && !moveNext()) {
throw new NoSuchElementException();
}
Entry<K, V> result = next;
foundNext = false;
next = null;
return result;
}
@Override
public void close() throws IOException {
in.close();
}
private boolean moveNext() throws IOException, ClassNotFoundException {
SnapshotRecord record;
while ((record = in.readSnapshotRecord()) != null) {
foundNext = true;
final K key = record.getKeyObject();
final V value = record.getValueObject();
next = new Entry<K, V>() {
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
};
return true;
}
close();
return false;
}
};
}
/**
* Writes a snapshot file.
*/
static class GFSnapshotExporter {
/** the file channel, used for random access */
private final FileChannel fc;
/** the output stream */
private final DataOutputStream dos;
private final InternalCache cache;
public GFSnapshotExporter(File out, String region, InternalCache cache) throws IOException {
this.cache = cache;
FileOutputStream fos = new FileOutputStream(out);
fc = fos.getChannel();
dos = new DataOutputStream(new BufferedOutputStream(fos));
// write snapshot version
dos.writeByte(SNAP_VER_2);
// write format type
dos.write(SNAP_FMT);
// write temporary pdx location in bytes 4-11
dos.writeLong(-1);
// write region name
dos.writeUTF(region);
}
/**
* Writes an entry in the snapshot.
*
* @param entry the snapshot entry
* @throws IOException unable to write entry
*/
public void writeSnapshotEntry(SnapshotRecord entry) throws IOException {
InternalDataSerializer.invokeToData(entry, dos);
}
public void close() throws IOException {
// write entry terminator entry
DataSerializer.writeByteArray(null, dos);
// grab the pdx start location
dos.flush();
long registryPosition = fc.position();
// write pdx types
try {
new ExportedRegistry(cache.getPdxRegistry()).toData(dos);
} catch (CacheClosedException e) {
// ignore pdx types
new ExportedRegistry().toData(dos);
}
// write the pdx position
dos.flush();
fc.position(4);
dos.writeLong(registryPosition);
dos.close();
}
}
/**
* Reads a snapshot file.
*/
static class GFSnapshotImporter {
/** the snapshot file version */
private final byte version;
/** the region name */
private final String region;
/** the internal pdx registry (not the system-wide pdx registry) */
private final ExportedRegistry pdx;
/** the input stream */
private final DataInputStream dis;
public GFSnapshotImporter(File in, TypeRegistry typeRegistry)
throws IOException, ClassNotFoundException {
pdx = new ExportedRegistry();
// read header and pdx registry
long entryPosition;
FileInputStream fis = new FileInputStream(in);
FileChannel fc = fis.getChannel();
DataInputStream tmp = new DataInputStream(fis);
try {
// read the snapshot file header
version = tmp.readByte();
if (version == SNAP_VER_1) {
throw new IOException(
String.format("Unsupported snapshot version: %s", SNAP_VER_1)
+ ": " + in);
} else if (version == SNAP_VER_2) {
// read format
byte[] format = new byte[3];
tmp.readFully(format);
if (!Arrays.equals(format, SNAP_FMT)) {
throw new IOException(String.format("Unrecognized snapshot file type: %s",
Arrays.toString(format)) + ": " + in);
}
// read pdx location
long registryPosition = tmp.readLong();
// read region
region = tmp.readUTF();
entryPosition = fc.position();
// read pdx
if (registryPosition != -1) {
fc.position(registryPosition);
pdx.fromData(tmp);
}
} else {
throw new IOException(
String.format("Unrecognized snapshot file version: %s", version)
+ ": " + in);
}
} finally {
tmp.close();
}
// check compatibility with the existing pdx types so we don't have to
// do any translation...preexisting types or concurrent put ops may cause
// this check to fail
checkPdxTypeCompatibility(typeRegistry);
checkPdxEnumCompatibility(typeRegistry);
// open new stream with buffering for reading entries
dis = new DataInputStream(new BufferedInputStream(new FileInputStream(in)));
dis.skip(entryPosition);
}
/**
* Returns the snapshot file version.
*
* @return the version
*/
public byte getVersion() {
return version;
}
/**
* Returns the original pathname of the region used to create the snapshot.
*
* @return the region name (full pathname)
*/
public String getRegionName() {
return region;
}
/**
* Returns the pdx types defined in the snapshot file.
*
* @return the pdx types
*/
public ExportedRegistry getPdxTypes() {
return pdx;
}
/**
* Reads a snapshot entry. If the last entry has been read, a null value will be returned.
*
* @return the entry or null
* @throws IOException unable to read entry
* @throws ClassNotFoundException unable to create entry
*/
public SnapshotRecord readSnapshotRecord() throws IOException, ClassNotFoundException {
byte[] key = DataSerializer.readByteArray(dis);
if (key == null) {
return null;
}
byte[] value = DataSerializer.readByteArray(dis);
return new SnapshotRecord(key, value);
}
public void close() throws IOException {
dis.close();
}
private void checkPdxTypeCompatibility(TypeRegistry tr) {
if (tr == null) {
return;
}
for (Map.Entry<Integer, PdxType> entry : pdx.types().entrySet()) {
tr.addImportedType(entry.getKey(), entry.getValue());
}
}
private void checkPdxEnumCompatibility(TypeRegistry tr) {
if (tr == null) {
return;
}
for (Map.Entry<Integer, EnumInfo> entry : pdx.enums().entrySet()) {
tr.addImportedEnum(entry.getKey(), entry.getValue());
}
}
}
}