blob: 3787ef73bf93aca53ce2a41845e5b07cca8da65a [file] [log] [blame]
package org.apache.s4.core.ft;
import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import com.google.inject.name.Names;
/**
* Creates the /checkpointed znode if a successful checkpointing callback is received. Does it only once.
*
*/
public class FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule extends
FileSystemBackendCheckpointingModule {
@Override
protected void configure() {
super.configure();
bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
}
public static class DummyZKStorageCallbackFactory implements StorageCallbackFactory {
@Override
public StorageCallback createStorageCallback() {
return new StorageCallback() {
@Override
public void storageOperationResult(StorageResultCode resultCode, Object message) {
if (resultCode == StorageResultCode.SUCCESS) {
try {
ZooKeeper zkClient = CoreTestUtils.createZkClient();
zkClient.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
e.printStackTrace();
}
}
};
};
}
}
}