blob: 43df76376434f8de28bec8bc9453f596662b173b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tests.util.hbase;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.util.OperatingSystem;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/** {@link HBaseResource} that downloads hbase and set up a local hbase cluster. */
public class LocalStandaloneHBaseResource implements HBaseResource {
private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneHBaseResource.class);
private static final int MAX_RETRIES = 3;
private static final int RETRY_INTERVAL_SECONDS = 30;
private final TemporaryFolder tmp = new TemporaryFolder();
private final DownloadCache downloadCache = DownloadCache.get();
private final String hbaseVersion;
private Path hbaseDir;
LocalStandaloneHBaseResource(String hbaseVersion) {
OperatingSystemRestriction.forbid(
String.format(
"The %s relies on UNIX utils and shell scripts.",
getClass().getSimpleName()),
OperatingSystem.WINDOWS);
this.hbaseVersion = hbaseVersion;
}
private String getHBaseDownloadUrl() {
return String.format(
"https://archive.apache.org/dist/hbase/%1$s/hbase-%1$s-bin.tar.gz", hbaseVersion);
}
@Override
public void before() throws Exception {
tmp.create();
downloadCache.before();
this.hbaseDir = tmp.newFolder("hbase-" + hbaseVersion).toPath().toAbsolutePath();
setupHBaseDist();
setupHBaseCluster();
}
private void setupHBaseDist() throws IOException {
final Path downloadDirectory = tmp.newFolder("getOrDownload").toPath();
final Path hbaseArchive =
downloadCache.getOrDownload(getHBaseDownloadUrl(), downloadDirectory);
LOG.info("HBase location: {}", hbaseDir.toAbsolutePath());
AutoClosableProcess.runBlocking(
CommandLineWrapper.tar(hbaseArchive)
.extract()
.zipped()
.strip(1)
.targetDir(hbaseDir)
.build());
LOG.info("Configure {} as hbase.tmp.dir", hbaseDir.toAbsolutePath());
final String tmpDirConfig =
"<configuration><property><name>hbase.tmp.dir</name><value>"
+ hbaseDir
+ "</value></property></configuration>";
Files.write(hbaseDir.resolve(Paths.get("conf", "hbase-site.xml")), tmpDirConfig.getBytes());
}
private void setupHBaseCluster() throws IOException {
LOG.info("Starting HBase cluster...");
runHBaseProcessWithRetry("start-hbase.sh", () -> !isHMasterRunning());
LOG.info("Start HBase cluster success");
}
@Override
public void afterTestSuccess() {
shutdownResource();
downloadCache.afterTestSuccess();
tmp.delete();
}
private void shutdownResource() {
LOG.info("Stopping HBase Cluster...");
try {
runHBaseProcessWithRetry("stop-hbase.sh", () -> isHMasterAlive());
} catch (IOException ioe) {
LOG.warn("Error when shutting down HBase Cluster.", ioe);
}
LOG.info("Stop HBase Cluster success");
}
private void runHBaseProcessWithRetry(String command, Supplier<Boolean> processStatusChecker)
throws IOException {
LOG.info("Execute {} for HBase Cluster", command);
for (int i = 1; i <= MAX_RETRIES; i++) {
try {
AutoClosableProcess.runBlocking(
hbaseDir.resolve(Paths.get("bin", command)).toString());
} catch (IOException ioe) {
LOG.warn("Get exception when execute {} ", command, ioe);
}
int waitSecond = 0;
while (processStatusChecker.get()) {
try {
LOG.info("Waiting for HBase {} works", command);
Thread.sleep(1000L);
} catch (InterruptedException e) {
LOG.warn("sleep interrupted", e);
}
waitSecond++;
if (waitSecond > RETRY_INTERVAL_SECONDS) {
break;
}
}
if (waitSecond < RETRY_INTERVAL_SECONDS) {
break;
} else {
if (i == MAX_RETRIES) {
LOG.error("Execute {} failed, retry times {}", command, i);
throw new IllegalArgumentException(
String.format("Execute %s failed aftert retry %s times", command, i));
} else {
LOG.warn("Execute {} failed, retry times {}", command, i);
}
}
}
}
private boolean isHMasterRunning() {
try {
final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
queryHBaseStatus(
line ->
atomicHMasterStarted.compareAndSet(
false, line.contains("hbase:namespace")));
return atomicHMasterStarted.get();
} catch (IOException ioe) {
return false;
}
}
private void queryHBaseStatus(final Consumer<String> stdoutProcessor) throws IOException {
executeHBaseShell("scan 'hbase:meta'", stdoutProcessor);
}
private boolean isHMasterAlive() {
try {
final AtomicBoolean atomicHMasterStarted = new AtomicBoolean(false);
queryHBaseProcess(
line -> atomicHMasterStarted.compareAndSet(false, line.contains("HMaster")));
return atomicHMasterStarted.get();
} catch (IOException ioe) {
return false;
}
}
private void queryHBaseProcess(final Consumer<String> stdoutProcessor) throws IOException {
AutoClosableProcess.create("jps").setStdoutProcessor(stdoutProcessor).runBlocking();
}
@Override
public void createTable(String tableName, String... columnFamilies) throws IOException {
final String createTable =
String.format("create '%s',", tableName)
+ Arrays.stream(columnFamilies)
.map(cf -> String.format("{NAME=>'%s'}", cf))
.collect(Collectors.joining(","));
executeHBaseShell(createTable);
}
@Override
public List<String> scanTable(String tableName) throws IOException {
final List<String> result = new ArrayList<>();
executeHBaseShell(
String.format("scan '%s'", tableName),
line -> {
if (line.contains("value=")) {
result.add(line);
}
});
return result;
}
@Override
public void putData(
String tableName,
String rowKey,
String columnFamily,
String columnQualifier,
String value)
throws IOException {
executeHBaseShell(
String.format(
"put '%s','%s','%s:%s','%s'",
tableName, rowKey, columnFamily, columnQualifier, value));
}
private void executeHBaseShell(String cmd) throws IOException {
executeHBaseShell(cmd, line -> {});
}
private void executeHBaseShell(String cmd, Consumer<String> stdoutProcessor)
throws IOException {
AutoClosableProcess.create(hbaseDir.resolve(Paths.get("bin", "hbase")).toString(), "shell")
.setStdoutProcessor(stdoutProcessor)
.setStdInputs(cmd)
.runBlocking();
}
}