blob: 3bd75e5750c75d5e72b5de2441b8209c44f4ddae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.util.JsonSerialization;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertDirEntryMatch;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.assertFileEntryMatch;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.marshallPath;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.unmarshallPath;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test Reading/writing manifest files.
*/
public class TestTaskManifestFileIO extends AbstractManifestCommitterTest {
private TaskManifest source;
private ManifestCommitterTestSupport.JobAndTaskIDsForTests taskIDs;
private String taskAttempt00;
private Path testPath;
private Path taPath;
@Override
public void setup() throws Exception {
super.setup();
taskIDs = new ManifestCommitterTestSupport.JobAndTaskIDsForTests(2, 2);
source = new TaskManifest();
taskAttempt00 = taskIDs.getTaskAttempt(0, 0);
source.setTaskAttemptID(taskAttempt00);
testPath = methodPath();
taPath = new Path(testPath, " " + taskAttempt00);
source.setTaskAttemptDir(marshallPath(taPath));
}
/**
* Test marshalling, paying attention to paths with spaces in them
* as they've been a source of trouble in the S3A committers.
*/
@Test
public void testJsonRoundTrip() throws Throwable {
describe("Save manifest file to string and back");
Path subdirS = new Path(taPath, "subdir");
Path subdirD = new Path(testPath, "subdir");
source.addDirectory(DirEntry.dirEntry(subdirD, 0, 0));
// a file
Path subfileS = new Path(subdirS, "file");
Path subfileD = new Path(subdirD, "file");
long len = 256L;
FileEntry subFileEntry = new FileEntry(subfileS,
subfileD, len, "etag");
source.addFileToCommit(subFileEntry);
JsonSerialization<TaskManifest> serializer
= TaskManifest.serializer();
String json = serializer.toJson(source);
LOG.info("serialized form\n{}", json);
TaskManifest deser = serializer.fromJson(json);
deser.validate();
Assertions.assertThat(deser.getTaskAttemptID())
.describedAs("Task attempt ID")
.isEqualTo(taskAttempt00);
Assertions.assertThat(unmarshallPath(deser.getTaskAttemptDir()))
.describedAs("Task attempt Dir %s",
deser.getTaskAttemptDir())
.isEqualTo(taPath);
Assertions.assertThat(deser.getDestDirectories())
.hasSize(1)
.allSatisfy(d -> assertDirEntryMatch(d, subdirD, 0));
Assertions.assertThat(deser.getFilesToCommit())
.hasSize(1)
.allSatisfy(d -> assertFileEntryMatch(d, subfileS, subfileD, len));
final FileEntry entry = deser.getFilesToCommit().get(0);
assertFileEntryMatch(entry, subfileS, subfileD, len);
Assertions.assertThat(entry.getEtag())
.describedAs("etag of %s", entry)
.isEqualTo("etag");
}
/**
* The manifest validation logic has a safety check that only one
* file can rename to the same destination, and that the entries
* are valid.
*/
@Test
public void testValidateRejectsTwoCommitsToSameDest() throws Throwable {
Path subdirS = new Path(taPath, "subdir");
Path subdirD = new Path(testPath, "subdir");
source.addDirectory(DirEntry.dirEntry(subdirD, 0, 0));
// a file
Path subfileS = new Path(subdirS, "file");
Path subfileS2 = new Path(subdirS, "file2");
Path subfileD = new Path(subdirD, "file");
long len = 256L;
source.addFileToCommit(
new FileEntry(subfileS, subfileD, len, "tag1"));
source.addFileToCommit(
new FileEntry(subfileS2, subfileD, len, "tag2"));
assertValidationFailureOnRoundTrip(source);
}
/**
* The manifest validation logic has a safety check that only one
* file can rename to the same destination, and that the entries
* are valid.
*/
@Test
public void testValidateRejectsIncompleteFileEntry() throws Throwable {
source.addFileToCommit(
new FileEntry(taPath, null, 0, null));
assertValidationFailureOnRoundTrip(source);
}
/**
* negative lengths are not allowed.
*/
@Test
public void testValidateRejectsInvalidFileLength() throws Throwable {
source.addFileToCommit(
new FileEntry(taPath, testPath, -1, null));
assertValidationFailureOnRoundTrip(source);
}
@Test
public void testRejectIncompatibleVersion() throws Throwable {
source.setVersion(5);
assertValidationFailureOnRoundTrip(source);
}
@Test
public void testRejectIncompatibleType() throws Throwable {
source.setType("Incompatible type");
assertValidationFailureOnRoundTrip(source);
}
private void assertValidationFailureOnRoundTrip(
final TaskManifest manifest) throws Exception {
JsonSerialization<TaskManifest> serializer
= TaskManifest.serializer();
String json = serializer.toJson(manifest);
LOG.info("serialized form\n{}", json);
TaskManifest deser = serializer.fromJson(json);
intercept(IOException.class, deser::validate);
}
}