blob: 5ff452a22d5f0b1687a704094844bdf4b6e9df77 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.heron.dlog;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.heron.common.basics.Pair;
/**
* A utility program to copy data between filesystem files and dlog streams.
*/
public final class Util {
private Util() {
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage : Util <src> <target>");
System.err.println("");
System.err.println("NOTE: <src>/<target> can be either a file path or a dlog stream");
Runtime.getRuntime().exit(-1);
return;
}
String srcPath = args[0];
String destPath = args[1];
Namespace srcNs = null;
Namespace destNs = null;
InputStream is = null;
OutputStream os = null;
try {
if (srcPath.startsWith("distributedlog")) {
URI srcUri = URI.create(srcPath);
Pair<URI, String> parentAndName = getParentURI(srcUri);
srcNs = openNamespace(parentAndName.first);
is = openInputStream(srcNs, parentAndName.second);
} else {
is = new FileInputStream(destPath);
}
if (destPath.startsWith("distributedlog")) {
URI destUri = URI.create(srcPath);
Pair<URI, String> parentAndName = getParentURI(destUri);
destNs = openNamespace(parentAndName.first);
os = openOutputStream(destNs, parentAndName.second);
} else {
os = new FileOutputStream(destPath);
}
copyStream(is, os);
} finally {
if (null != is) {
is.close();
}
if (null != os) {
os.close();
}
if (null != srcNs) {
srcNs.close();
}
if (null != destNs) {
destNs.close();
}
}
}
static void copyStream(InputStream in, OutputStream out) throws IOException {
int read = 0;
byte[] bytes = new byte[128 * 1024];
while ((read = in.read(bytes)) >= 0) {
if (0 == read) {
continue;
}
out.write(bytes, 0, read);
}
out.flush();
out.close();
}
private static Pair<URI, String> getParentURI(URI uri) throws URISyntaxException {
String path = uri.getPath();
File pathFile = new File(path);
String logName = pathFile.getName();
String parentName = pathFile.getParent();
return Pair.create(
new URI(
uri.getScheme(),
uri.getAuthority(),
parentName,
uri.getQuery(),
uri.getFragment()
), logName);
}
private static Namespace openNamespace(URI uri) throws IOException {
DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
distributedLogConfiguration.addProperty("bkc.allowShadedLedgerManagerFactoryClass", true);
return NamespaceBuilder.newBuilder()
.uri(uri)
.clientId("dlog-util")
.conf(distributedLogConfiguration)
.build();
}
private static OutputStream openOutputStream(Namespace namespace, String pkgName)
throws IOException {
DistributedLogManager dlm = namespace.openLog(pkgName);
AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter();
return new DLOutputStream(dlm, writer);
}
private static InputStream openInputStream(Namespace ns, String logName)
throws Exception {
DistributedLogManager dlm = ns.openLog(logName);
return new DLInputStream(dlm);
}
}