blob: 0da5a650a0ee818857bacf6d8d4c28851575298d [file] [log] [blame]
package org.apache.helix.filestore;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.NotificationContext;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
public class Replicator extends RoutingTableProvider {
private InstanceConfig currentMasterConfig;
private final InstanceConfig localInstanceConfig;
private final String partition;
private final String resourceName;
AtomicBoolean isReplicationInitiated;
AtomicBoolean isReplicationStarted;
RsyncInvoker rsyncInvoker;
private ChangeLogProcessor processor;
private FileSystemWatchService watchService;
private ChangeLogReader reader;
public void setRsyncInvoker(RsyncInvoker rsyncInvoker) {
this.rsyncInvoker = rsyncInvoker;
}
public Replicator(InstanceConfig localInstanceConfig, String resourceName, String partition) {
this.localInstanceConfig = localInstanceConfig;
this.resourceName = resourceName;
this.partition = partition;
isReplicationInitiated = new AtomicBoolean(false);
isReplicationStarted = new AtomicBoolean(false);
}
public void start() throws Exception {
// System.out.println("Starting replication for ");
isReplicationInitiated.set(true);
List<InstanceConfig> instances = getInstances(resourceName, partition, "MASTER");
if (instances.size() > 0) {
if (instances.size() == 1) {
InstanceConfig newMasterConfig = instances.get(0);
String master = newMasterConfig.getInstanceName();
if (currentMasterConfig == null
|| !master.equalsIgnoreCase(currentMasterConfig.getInstanceName())) {
System.out.println("Found new master:" + newMasterConfig.getInstanceName());
if (currentMasterConfig != null) {
stop();
}
currentMasterConfig = newMasterConfig;
startReplication(currentMasterConfig);
} else {
System.out.println("Already replicating from " + master);
}
} else {
System.out.println("Invalid number of masters found:" + instances);
}
} else {
System.out.println("No master found");
}
}
public void startReplication(InstanceConfig masterInstanceConfig) throws Exception {
String remoteHost = masterInstanceConfig.getHostName();
String remoteChangeLogDir = masterInstanceConfig.getRecord().getSimpleField("change_log_dir");
String remoteFilestoreDir = masterInstanceConfig.getRecord().getSimpleField("file_store_dir");
String localChangeLogDir = localInstanceConfig.getRecord().getSimpleField("change_log_dir");
String localFilestoreDir = localInstanceConfig.getRecord().getSimpleField("file_store_dir");
String localcheckpointDir = localInstanceConfig.getRecord().getSimpleField("check_point_dir");
// setup rsync for the change log directory
setupRsync(remoteHost, remoteChangeLogDir, localChangeLogDir);
reader = new ChangeLogReader(localChangeLogDir);
watchService = new FileSystemWatchService(localChangeLogDir, reader);
processor =
new ChangeLogProcessor(reader, remoteHost, remoteFilestoreDir, localFilestoreDir,
localcheckpointDir);
watchService.start();
processor.start();
isReplicationStarted.set(true);
}
private void setupRsync(String remoteHost, String remoteBaseDir, String localBaseDir)
throws Exception {
rsyncInvoker = new RsyncInvoker(remoteHost, remoteBaseDir, localBaseDir);
boolean started = rsyncInvoker.runInBackground();
if (started) {
System.out.println("Rsync thread started in background");
} else {
throw new Exception("Unable to start rsync thread");
}
}
public void stop() {
if (isReplicationStarted.get()) {
System.out.println("Stopping replication from current master:"
+ currentMasterConfig.getInstanceName());
rsyncInvoker.stop();
watchService.stop();
processor.stop();
}
isReplicationInitiated.set(false);
}
@Override
public void onExternalViewChange(List<ExternalView> viewList, NotificationContext context) {
super.onExternalViewChange(viewList, context);
if (isReplicationInitiated.get()) {
try {
start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
InstanceConfig localInstanceConfig = new InstanceConfig("localhost_12001");
ZNRecord record = localInstanceConfig.getRecord();
record.setSimpleField("change_log_dir", "data/localhost_12001/translog");
record.setSimpleField("file_store_dir", "data/localhost_12001/filestore");
record.setSimpleField("check_point_dir", "data/localhost_12001/checkpoint");
InstanceConfig masterInstanceConfig = new InstanceConfig("localhost_12001");
record = masterInstanceConfig.getRecord();
record.setSimpleField("change_log_dir", "data/localhost_12000/translog");
record.setSimpleField("file_store_dir", "data/localhost_12000/filestore");
record.setSimpleField("check_point_dir", "data/localhost_12000/checkpoint");
Replicator replicator = new Replicator(localInstanceConfig, "resource", "partition");
replicator.startReplication(masterInstanceConfig);
}
}