blob: 1f3ef2ad0e9f80c09c33f5aae40bd3c1f41b4acb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal;
import static org.apache.geode.distributed.ConfigurationProperties.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Properties;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.net.SocketCreator;
/**
* MigrationClient is used to retrieve all of the data for a region from a MigrationServer. First
* start a MigrationServer using one version of GemFire, then connect to it using a MigrationClient
* with another version of GemFire.
* <p>
* Command line arguments are<br>
* &nbsp;&nbsp;region name (required)<br>
* &nbsp;&nbsp;cache-xml-file-name (required)<br>
* &nbsp;&nbsp;server port (defaults to 10553)<br>
* &nbsp;&nbsp;server address (defaults to local host)
* <p>
* The region should be defined in the cache-xml file, and must also be defined in the server's
* cache-xml file.
* <p>
* Typically, the cache-xml file will be exactly the same as the one used by the MigrationServer
* with different disk-dirs settings. When Region entries are transferred from the server to the
* client, they are then stored in new files in these directories.
*
* @since GemFire 6.0.1
*/
public class MigrationClient {
private static final boolean VERBOSE = MigrationServer.VERBOSE;
// version for backward communications compatibility
private static final int VERSION = 551;
private static final int CODE_ERROR = MigrationServer.CODE_ERROR;
/* serialized key, serialized value */
private static final int CODE_ENTRY = MigrationServer.CODE_ENTRY;
private static final int CODE_COMPLETED = MigrationServer.CODE_COMPLETED;
public static void main(String[] args) throws Exception {
int argIdx = 0;
String cacheXmlFileName;
String regionName;
if (args.length > argIdx + 1) {
regionName = args[argIdx++];
cacheXmlFileName = args[argIdx++];
} else {
System.err
.println("MigrationClient regionName [cache-xml-file] [server-port] [server-address]");
return;
}
int serverPort = 10533;
if (args.length > argIdx) {
serverPort = Integer.parseInt(args[argIdx++]);
}
String bindAddressName = null;
if (args.length > argIdx) {
bindAddressName = args[argIdx++];
}
MigrationClient instance = null;
try {
instance = new MigrationClient(cacheXmlFileName, bindAddressName, serverPort);
} catch (IllegalArgumentException e) {
System.err.println(e.getMessage());
System.exit(1);
}
instance.createDistributedSystem();
instance.createCache();
instance.getRegion(regionName);
}
private final InetAddress serverAddress;
private final int port;
private DistributedSystem distributedSystem;
private File cacheXmlFile;
private Cache cache;
private Socket server;
private int serverVersion;
private DataInputStream dis;
private DataOutputStream dos;
/**
* Create a MigrationClient to be used with a DistributedSystem and Cache that are created using
* GemFire APIs
*
* @param bindAddressName the server's address
* @param serverPort the server's port
*/
private MigrationClient(String bindAddressName, int serverPort) {
this.port = serverPort;
try {
this.serverAddress = InetAddress.getByName(bindAddressName);
} catch (IOException ignore) {
throw new IllegalArgumentException(
"Error - bind address cannot be resolved: '" + bindAddressName + '\'');
}
}
/**
* this is for use by main()
*
* @param cacheXmlFileName the name of the xml file describing the cache, or null
* @param bindAddressName the name of the NIC to bind to, or null
* @param serverPort the port to connect to (must not be zero)
*/
private MigrationClient(String cacheXmlFileName, String bindAddressName, int serverPort) {
this(bindAddressName, serverPort);
this.cacheXmlFile = new File(cacheXmlFileName);
if (!this.cacheXmlFile.exists()) {
// in 6.x this should be localizable
System.err
.println("Warning - file not found in local directory: '" + cacheXmlFileName + '\'');
}
}
/**
* Create a distributed system. If this method is not invoked before running the MigrationServer,
* an existing distributed system must exist for the server to use.
*
* @throws Exception if there are any problems
*/
private void createDistributedSystem() throws Exception {
Properties dsProps = new Properties();
// if no discovery information has been explicitly given, use a loner ds
if (System.getProperty(DistributionConfig.GEMFIRE_PREFIX + MCAST_PORT) == null
&& System.getProperty(DistributionConfig.GEMFIRE_PREFIX + LOCATORS) == null) {
dsProps.put(MCAST_PORT, "0");
}
dsProps.put(LOG_FILE, "migrationClient.log");
if (this.cacheXmlFile != null) {
dsProps.put(CACHE_XML_FILE, this.cacheXmlFile.getName());
}
this.distributedSystem = DistributedSystem.connect(dsProps);
}
/**
* create the cache to be used by this migration server
*
* @throws Exception if there are any problems
*/
private void createCache() throws Exception {
if (this.distributedSystem == null) {
this.distributedSystem = InternalDistributedSystem.getConnectedInstance();
}
this.cache = CacheFactory.create(this.distributedSystem);
}
private void initDSAndCache() {
if (this.distributedSystem == null) {
this.distributedSystem = InternalDistributedSystem.getConnectedInstance();
}
if (this.cache == null) {
this.cache = GemFireCacheImpl.getInstance();
}
}
public Region getRegion(String regionName) throws IOException, ClassNotFoundException {
initDSAndCache();
Region region = this.cache.getRegion(regionName);
try {
connectToServer();
if (this.serverVersion != VERSION) {
System.out.println("Don't know how to deal with version " + this.serverVersion);
throw new IOException("Server has incompatible version of MigrationServer");
}
this.dos.writeShort(MigrationServer.ClientRequest.REGION_REQUEST);
this.dos.writeUTF(regionName);
this.dos.flush();
boolean done = false;
while (!done) {
int responseCode = -1;
try {
responseCode = this.dis.readShort();
} catch (EOFException ignore) {
}
switch (responseCode) {
case -1:
throw new IOException("Server socket was closed while receiving entries");
case CODE_COMPLETED:
done = true;
break;
case CODE_ERROR:
String errorString = this.dis.readUTF();
System.err.println("Server responded with error: '" + errorString + '\'');
throw new IOException(errorString);
case CODE_ENTRY:
Object key = new ObjectInputStream(this.server.getInputStream()).readObject();
Object value = new ObjectInputStream(this.server.getInputStream()).readObject();
if (VERBOSE) {
System.out.println("received " + key);
}
region.put(key, value);
break;
}
}
} finally {
if (this.server != null && !this.server.isClosed()) {
this.server.close();
}
}
return region;
}
private void connectToServer() throws IOException {
this.server = new Socket();
SocketAddress addr;
if (this.serverAddress != null) {
addr = new InetSocketAddress(this.serverAddress, this.port);
} else {
addr = new InetSocketAddress(SocketCreator.getLocalHost(), this.port);
}
if (VERBOSE) {
System.out.println("connecting to " + addr);
}
this.server.connect(addr);
this.dos = new DataOutputStream(this.server.getOutputStream());
this.dos.writeShort(VERSION);
this.dis = new DataInputStream(this.server.getInputStream());
this.serverVersion = this.dis.readShort();
}
}