blob: 5c8ac96b5374e3741f9f1b721c5af429ceda2780 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.util.PathUtils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CopyableFileTest {
@Test
public void testSerializeDeserialze() throws Exception {
CopyableFile copyableFile =
new CopyableFile(new FileStatus(10, false, 12, 100, 12345, new Path("/path")), new Path("/destination"),
new OwnerAndPermission("owner", "group", FsPermission.getDefault()),
Lists.newArrayList(new OwnerAndPermission("owner2", "group2", FsPermission.getDefault())),
"checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
.<String, String>newHashMap(), "", null);
DatasetDescriptor dataset = new DatasetDescriptor("hive", "db.table");
PartitionDescriptor descriptor = new PartitionDescriptor("datepartition=2018/09/05", dataset);
copyableFile.setDestinationData(descriptor);
String s = CopyEntity.serialize(copyableFile);
CopyEntity de = CopyEntity.deserialize(s);
Assert.assertEquals(de, copyableFile);
}
@Test
public void testSerializeDeserialzeNulls() throws Exception {
CopyableFile copyableFile =
new CopyableFile(null, null, new OwnerAndPermission("owner", "group",
FsPermission.getDefault()), Lists.newArrayList(new OwnerAndPermission(null, "group2", FsPermission
.getDefault())), "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0,
Maps.<String, String>newHashMap(), "", null);
String serialized = CopyEntity.serialize(copyableFile);
CopyEntity deserialized = CopyEntity.deserialize(serialized);
Assert.assertEquals(deserialized, copyableFile);
}
@Test
public void testSerializeDeserialzeList() throws Exception {
List<CopyEntity> copyEntities =
ImmutableList.<CopyEntity>of(CopyableFileUtils.getTestCopyableFile(), CopyableFileUtils.getTestCopyableFile(),
CopyableFileUtils.getTestCopyableFile());
String serialized = CopyEntity.serializeList(copyEntities);
List<CopyEntity> deserialized = CopyEntity.deserializeList(serialized);
Assert.assertEquals(deserialized, copyEntities);
}
@Test
public void testSetFsDatasets() throws URISyntaxException {
FileSystem originFs = mock(FileSystem.class);
String originFsUri = "hdfs://source.company.biz:2000";
String originPath = "/data/databases/source/profile";
when(originFs.getUri()).thenReturn(new URI(originFsUri));
when(originFs.getScheme()).thenReturn("hdfs");
FileSystem targetFs = mock(FileSystem.class);
String targetFsUri = "file:///";
String destinationPath = "/data/databases/destination/profile";
when(targetFs.getUri()).thenReturn(new URI(targetFsUri));
when(targetFs.getScheme()).thenReturn("file");
// Test when source file is not a directory
FileStatus origin = new FileStatus(0l, false, 0, 0l, 0l, new Path(originPath));
CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "", null);
copyableFile.setFsDatasets(originFs, targetFs);
DatasetDescriptor source = (DatasetDescriptor) copyableFile.getSourceData();
Assert.assertEquals(source.getName(), "/data/databases/source");
Assert.assertEquals(source.getPlatform(), "hdfs");
Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
DatasetDescriptor destination = (DatasetDescriptor) copyableFile.getDestinationData();
Assert.assertEquals(destination.getName(), "/data/databases/destination");
Assert.assertEquals(destination.getPlatform(), "file");
Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
// Test when source file is a directory
originPath = originFsUri + originPath;
destinationPath = targetFsUri + destinationPath;
origin = new FileStatus(0l, true, 0, 0l, 0l, new Path(originPath));
copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "", null);
copyableFile.setFsDatasets(originFs, targetFs);
source = (DatasetDescriptor) copyableFile.getSourceData();
Assert.assertEquals(source.getName(), "/data/databases/source/profile");
Assert.assertEquals(source.getPlatform(), "hdfs");
Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
destination = (DatasetDescriptor) copyableFile.getDestinationData();
Assert.assertEquals(destination.getName(), "/data/databases/destination/profile");
Assert.assertEquals(destination.getPlatform(), "file");
Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
}
@Test
public void testCopyableFileBuilderMinimumConfiguration()
throws IOException {
// Source
String datasetRootDir = "/data/databases/source";
Path datasetRoot = new Path(datasetRootDir);
FileSystem originFS = FileSystem.getLocal(new Configuration());
Path originFile = new Path(datasetRootDir, "copyableFile");
FileStatus origin = new FileStatus(0l, false, 0, 0l, System.currentTimeMillis(), originFile);
PreserveAttributes preserveAttributes = PreserveAttributes.fromMnemonicString("ugp");
// Target
String targetRoot = "/data/databases/destination";
Path relativePath = PathUtils.relativizePath(originFile, datasetRoot);
Path targetPath = new Path(targetRoot, relativePath);
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
CopyConfiguration copyConfiguration =
CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties).preserve(preserveAttributes).build();
CopyableFile copyableFile = CopyableFile.builder(originFS, origin, datasetRoot, copyConfiguration)
.destination(targetPath)
.ancestorsOwnerAndPermission(Lists.<OwnerAndPermission>newArrayList()) // not testing ancestors
.build();
// Making sure all fields are populated correctly via CopyableFile builder
// Verify preserve attribute options
Assert.assertEquals(copyableFile.getPreserve().toMnemonicString(), preserveAttributes.toMnemonicString());
// Verify origin
Assert.assertEquals(copyableFile.getFileSet(), "");
Assert.assertEquals(copyableFile.getOrigin(), origin);
// Verify destination target, permissions and other attributes
Assert.assertEquals(copyableFile.getChecksum().length, 0);
Assert.assertEquals(copyableFile.getDestination().toString(), targetPath.toString());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getGroup(), origin.getGroup());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getOwner(), origin.getOwner());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getFsPermission(),
origin.getPermission());
// Verify auto determined timestamp
Assert.assertEquals(copyableFile.getOriginTimestamp(), origin.getModificationTime());
Assert.assertEquals(copyableFile.getUpstreamTimestamp(), origin.getModificationTime());
}
@Test
public void testCopyableFileBuilderMaximumConfiguration()
throws IOException {
// Source
String datasetRootDir = "/data/databases/source";
Path datasetRoot = new Path(datasetRootDir);
FileSystem originFS = FileSystem.getLocal(new Configuration());
Path originFile = new Path(datasetRootDir, "copyableFile");
FileStatus origin = new FileStatus(0l, false, 0, 0l, System.currentTimeMillis(), originFile);
PreserveAttributes preserveAttributes = PreserveAttributes.fromMnemonicString("ugp");
// Target
String targetRoot = "/data/databases/destination";
Path relativePath = PathUtils.relativizePath(originFile, datasetRoot);
Path targetPath = new Path(targetRoot, relativePath);
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher");
CopyConfiguration copyConfiguration =
CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties).preserve(preserveAttributes).build();
// Other attributes
String fileSet = "fileset";
byte[] checksum = new byte[1];
long originTimestamp = 23091986l;
long upstreamTimestamp = 23091986l;
OwnerAndPermission ownerAndPermission = new OwnerAndPermission("gobblin", "gobblin-dev", origin.getPermission());
CopyableFile copyableFile = CopyableFile.builder(originFS, origin, datasetRoot, copyConfiguration)
.fileSet(fileSet).checksum(checksum)
.originTimestamp(originTimestamp).upstreamTimestamp(upstreamTimestamp)
.destinationOwnerAndPermission(ownerAndPermission)
.origin(origin)
.preserve(preserveAttributes)
.destination(targetPath)
.ancestorsOwnerAndPermission(Lists.<OwnerAndPermission>newArrayList())
.build();
// Verify preserve attribute options
Assert.assertEquals(copyableFile.getPreserve().toMnemonicString(), preserveAttributes.toMnemonicString());
// Verify origin
Assert.assertEquals(copyableFile.getFileSet(), fileSet);
Assert.assertEquals(copyableFile.getOrigin(), origin);
// Verify destination target, permissions and other attributes
Assert.assertEquals(copyableFile.getChecksum().length, 1);
Assert.assertEquals(copyableFile.getDestination().toString(), targetPath.toString());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getGroup(), ownerAndPermission.getGroup());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getOwner(), ownerAndPermission.getOwner());
Assert.assertEquals(copyableFile.getDestinationOwnerAndPermission().getFsPermission(),
ownerAndPermission.getFsPermission());
// Verify auto determined timestamp
Assert.assertEquals(copyableFile.getOriginTimestamp(), originTimestamp);
Assert.assertEquals(copyableFile.getUpstreamTimestamp(), upstreamTimestamp);
}
@Test
public void testResolveOwnerAndPermission() throws Exception {
Path path = new Path("/test/path");
FileStatus fileStatus = new FileStatus(1, false, 0, 0, 0, 0, FsPermission.getDefault(), "owner", "group", path);
FileSystem fs = mock(FileSystem.class);
Mockito.doReturn(fileStatus).when(fs).getFileStatus(path);
Mockito.doReturn(path).when(fs).makeQualified(path);
Mockito.doReturn(new URI("hdfs://uri")).when(fs).getUri();
Properties properties = new Properties();
properties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/final/dir");
OwnerAndPermission ownerAndPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, path,
new CopyConfiguration.CopyConfigurationBuilder(fs, properties).build());
Assert.assertEquals(ownerAndPermission.getOwner(), null);
Assert.assertEquals(ownerAndPermission.getGroup(), null);
Assert.assertEquals(ownerAndPermission.getFsPermission(), null);
ownerAndPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, path,
new CopyConfiguration.CopyConfigurationBuilder(fs, properties).targetGroup(Optional.of("target")).build());
Assert.assertEquals(ownerAndPermission.getOwner(), null);
Assert.assertEquals(ownerAndPermission.getGroup(), "target");
Assert.assertEquals(ownerAndPermission.getFsPermission(), null);
ownerAndPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, path,
new CopyConfiguration.CopyConfigurationBuilder(fs, properties).targetGroup(Optional.of("target")).
preserve(PreserveAttributes.fromMnemonicString("ug")).build());
Assert.assertEquals(ownerAndPermission.getOwner(), "owner");
Assert.assertEquals(ownerAndPermission.getGroup(), "target");
Assert.assertEquals(ownerAndPermission.getFsPermission(), null);
ownerAndPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, path,
new CopyConfiguration.CopyConfigurationBuilder(fs, properties).preserve(PreserveAttributes.fromMnemonicString("ug")).build());
Assert.assertEquals(ownerAndPermission.getOwner(), "owner");
Assert.assertEquals(ownerAndPermission.getGroup(), "group");
Assert.assertEquals(ownerAndPermission.getFsPermission(), null);
ownerAndPermission = CopyableFile.resolveReplicatedOwnerAndPermission(fs, path,
new CopyConfiguration.CopyConfigurationBuilder(fs, properties).preserve(PreserveAttributes.fromMnemonicString("ugp")).build());
Assert.assertEquals(ownerAndPermission.getOwner(), "owner");
Assert.assertEquals(ownerAndPermission.getGroup(), "group");
Assert.assertEquals(ownerAndPermission.getFsPermission(), FsPermission.getDefault());
}
}