blob: e2bbe9db224fac9f847c60a24c2e1641670f9e67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ambari.infra.steps;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static org.apache.ambari.infra.OffsetDateTimeConverter.SOLR_DATETIME_FORMATTER;
import static org.apache.ambari.infra.TestUtil.doWithin;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.ambari.infra.InfraClient;
import org.apache.ambari.infra.JobExecutionInfo;
import org.apache.ambari.infra.S3Client;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.solr.client.solrj.SolrQuery;
import org.jbehave.core.annotations.Given;
import org.jbehave.core.annotations.Then;
import org.jbehave.core.annotations.When;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class ExportJobsSteps extends AbstractInfraSteps {
private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class);
private Set<String> documentIds = new HashSet<>();
private Map<String, JobExecutionInfo> launchedJobs = new HashMap<>();
@Given("$count documents in solr")
public void addDocuments(int count) {
OffsetDateTime intervalEnd = OffsetDateTime.now();
documentIds.clear();
for (int i = 0; i < count; ++i) {
documentIds.add(addDocument(intervalEnd.minusMinutes(i % (count / 10))).get("id").getValue().toString());
}
getSolr().commit();
}
@Given("$count documents in solr with logtime from $startLogtime to $endLogtime")
public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime endLogtime) {
Duration duration = Duration.between(startLogtime, endLogtime);
long increment = duration.toNanos() / count;
documentIds.clear();
for (int i = 0; i < count; ++i) {
documentIds.add(addDocument(startLogtime.plusNanos(increment * i)).get("id").getValue().toString());
}
getSolr().commit();
}
@Given("a file on s3 with key $key")
public void addFileToS3(String key) {
getS3client().putObject(key, "anything".getBytes());
}
@When("start $jobName job")
public void startJob(String jobName) throws Exception {
startJob(jobName, null, 0);
}
@When("start $jobName job with parameters $parameters after $waitSec seconds")
public void startJob(String jobName, String parameters, int waitSec) throws Exception {
Thread.sleep(waitSec * 1000);
try (InfraClient httpClient = getInfraClient()) {
JobExecutionInfo jobExecutionInfo = httpClient.startJob(jobName, parameters);
LOG.info("Job {} started: {}", jobName, jobExecutionInfo);
launchedJobs.put(jobName, jobExecutionInfo);
}
}
@When("restart $jobName job within $waitSec seconds")
public void restartJob(String jobName, int waitSec) {
doWithin(waitSec, "Restarting job " + jobName, () -> {
try (InfraClient httpClient = getInfraClient()) {
httpClient.restartJob(jobName, launchedJobs.get(jobName).getJobId());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds")
public void stopJob(String jobName, int count, String text, int waitSec) throws Exception {
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() > count);
try (InfraClient httpClient = getInfraClient()) {
httpClient.stopJob(launchedJobs.get(jobName).getExecutionId());
}
}
@When("delete file with key $key from s3")
public void deleteFileFromS3(String key) {
getS3client().deleteObject(key);
}
@Then("Check filenames contains the text $text on s3 server after $waitSec seconds")
public void checkS3After(String text, int waitSec) {
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> !s3Client.listObjectKeys().isEmpty());
List<String> objectKeys = s3Client.listObjectKeys(text);
assertThat(objectKeys, hasItem(containsString(text)));
}
@Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
public void checkNumberOfFilesOnS3(long count, String text, int waitSec) {
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() == count);
}
@Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds")
public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) {
S3Client s3Client = getS3client();
doWithin(waitSec, "check uploaded files to s3", () -> between(
s3Client.listObjectKeys(text).size(), 1L, count - 1L));
}
private boolean between(long count, long from, long to) {
return from <= count && count <= to;
}
@Then("No file exists on s3 server with filenames containing the text $text")
public void fileNotExistOnS3(String text) {
S3Client s3Client = getS3client();
assertThat(s3Client.listObjectKeys().stream()
.anyMatch(objectKey -> objectKey.contains(text)), is(false));
}
@Then("solr contains $count documents between $startLogtime and $endLogtime")
public void documentCount(int count, OffsetDateTime startLogTime, OffsetDateTime endLogTime) {
SolrQuery query = new SolrQuery();
query.setRows(count * 2);
query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
assertThat(getSolr().query(query).getResults().size(), is(count));
}
@Then("solr does not contain documents between $startLogtime and $endLogtime after $waitSec seconds")
public void isSolrEmpty(OffsetDateTime startLogTime, OffsetDateTime endLogTime, int waitSec) {
SolrQuery query = new SolrQuery();
query.setRows(1);
query.setQuery(String.format("logtime:[\"%s\" TO \"%s\"]", SOLR_DATETIME_FORMATTER.format(startLogTime), SOLR_DATETIME_FORMATTER.format(endLogTime)));
doWithin(waitSec, "check solr is empty", () -> isSolrEmpty(query));
}
private boolean isSolrEmpty(SolrQuery query) {
return getSolr().query(query).getResults().isEmpty();
}
@Then("Check $count files exists on hdfs with filenames containing the text $text in the folder $path after $waitSec seconds")
public void checkNumberOfFilesOnHdfs(int count, String text, String path, int waitSec) throws Exception {
try (FileSystem fileSystem = getHdfs()) {
doWithin(waitSec, "check uploaded files to hdfs", () -> {
try {
int fileCount = 0;
RemoteIterator<LocatedFileStatus> it = fileSystem.listFiles(new Path(path), true);
while (it.hasNext()) {
if (it.next().getPath().getName().contains(text))
++fileCount;
}
return fileCount == count;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
}
@Then("Check $count files exists on local filesystem with filenames containing the text $text in the folder $path for job $jobName")
public void checkNumberOfFilesOnLocalFilesystem(long count, String text, String path, String jobName) {
File destinationDirectory = new File(getLocalDataFolder(), path.replace("${jobId}", launchedJobs.get(jobName).getJobId()));
LOG.info("Destination directory path: {}", destinationDirectory.getAbsolutePath());
doWithin(5, "Destination directory exists", destinationDirectory::exists);
File[] files = requireNonNull(destinationDirectory.listFiles(),
String.format("Path %s is not a directory or an I/O error occurred!", destinationDirectory.getAbsolutePath()));
assertThat(Arrays.stream(files)
.filter(file -> file.getName().contains(text))
.count(), is(count));
}
private static final ObjectMapper json = new ObjectMapper();
@Then("Check the files $fileNamePart contains the archived documents")
public void checkStoredDocumentIds(String fileNamePart) throws Exception {
S3Client s3Client = getS3client();
int size = documentIds.size();
Set<String> storedDocumentIds = new HashSet<>();
for (String objectKey : s3Client.listObjectKeys(fileNamePart)) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new BZip2CompressorInputStream(s3Client.getObject(objectKey)), UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
Map<String, Object> document = json.readValue(line, new TypeReference<HashMap<String, Object>>() {});
String id = document.get("id").toString();
storedDocumentIds.add(id);
documentIds.remove(id);
}
}
}
assertThat(documentIds.size(), is(0));
assertThat(storedDocumentIds.size(), is(size));
}
}