blob: e04bee466e75b0775a800c6000f75fcdb35c53db [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using Org.Apache.REEF.IO.FileSystem;
using Org.Apache.REEF.IO.FileSystem.Hadoop;
using Org.Apache.REEF.IO.PartitionedData;
using Org.Apache.REEF.IO.PartitionedData.FileSystem;
using Org.Apache.REEF.IO.PartitionedData.FileSystem.Parameters;
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
namespace Org.Apache.REEF.IO.TestClient
{
// TODO[JIRA REEF-815]: once move to Nunit, tose test should be moved to Test project
internal class HadoopFileInputPartitionTest
{
private static readonly Logger Logger = Logger.GetLogger(typeof(HadoopFileInputPartitionTest));
internal static bool TestWithByteDeserializer()
{
string remoteFilePath1 = MakeRemoteTestFile(new byte[] { 111, 112, 113 });
string remoteFilePath2 = MakeRemoteTestFile(new byte[] { 114, 115, 116, 117 });
var serializerConf = TangFactory.GetTang().NewConfigurationBuilder()
.BindImplementation<IFileDeSerializer<IEnumerable<byte>>, ByteSerializer>(GenericType<IFileDeSerializer<IEnumerable<byte>>>.Class,
GenericType<ByteSerializer>.Class)
.Build();
var serializerConfString = (new AvroConfigurationSerializer()).ToString(serializerConf);
var dataSet = TangFactory.GetTang()
.NewInjector(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.ConfigurationModule
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath1)
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FilePathForPartitions, remoteFilePath2)
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.FileSerializerConfig, serializerConfString)
.Set(FileSystemInputPartitionConfiguration<IEnumerable<byte>>.CopyToLocal, "true")
.Build(),
HadoopFileSystemConfiguration.ConfigurationModule.Build())
.GetInstance<IPartitionedInputDataSet>();
Logger.Log(Level.Info, "IPartitionedDataSet created.");
int count = 0;
foreach (var partitionDescriptor in dataSet)
{
var partition = TangFactory.GetTang()
.NewInjector(partitionDescriptor.GetPartitionConfiguration(), GetHadoopFileSystemConfiguration())
.GetInstance<IInputPartition<IEnumerable<byte>>>();
Logger.Log(Level.Info, "GetInstance of partition.");
using (partition as IDisposable)
{
Logger.Log(Level.Info, "get partition instance.");
var e = partition.GetPartitionHandle();
foreach (var v in e)
{
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "Data read {0}: ", v));
count++;
}
}
}
Logger.Log(Level.Info, "Total count returend: " + count);
return true;
}
internal static string MakeRemoteTestFile(byte[] bytes)
{
IFileSystem fileSystem =
TangFactory.GetTang()
.NewInjector(HadoopFileSystemConfiguration.ConfigurationModule.Build())
.GetInstance<IFileSystem>();
string localFile = MakeLocalTempFile(bytes);
string remoteFileName = "/tmp/TestHadoopFilePartition-" +
DateTime.Now.ToString("yyyyMMddHHmmssfff");
var remoteUri = fileSystem.CreateUriForPath(remoteFileName);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "remoteUri {0}: ", remoteUri));
fileSystem.CopyFromLocal(localFile, remoteUri);
Logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, "File CopyFromLocal {0}: ", localFile));
return remoteFileName;
}
internal static void MakeLocalTestFile(string filePath, byte[] bytes)
{
if (File.Exists(filePath))
{
File.Delete(filePath);
}
using (var s = File.Create(filePath))
{
foreach (var b in bytes)
{
s.WriteByte(b);
}
}
}
internal static string MakeLocalTempFile(byte[] bytes)
{
var result = Path.GetTempFileName();
MakeLocalTestFile(result, bytes);
return result;
}
private static IConfiguration GetHadoopFileSystemConfiguration()
{
return TangFactory.GetTang().NewConfigurationBuilder()
.BindImplementation(typeof(IFileSystem), typeof(HadoopFileSystem))
.Build();
}
}
internal class ByteSerializer : IFileDeSerializer<IEnumerable<byte>>
{
[Inject]
private ByteSerializer()
{
}
/// <summary>
/// Read bytes from all the files in the set and return one by one
/// </summary>
/// <param name="filePaths"></param>
/// <returns></returns>
public IEnumerable<byte> Deserialize(ISet<string> filePaths)
{
foreach (var f in filePaths)
{
using (FileStream stream = File.Open(f, FileMode.Open))
{
BinaryReader reader = new BinaryReader(stream);
while (reader.PeekChar() != -1)
{
yield return reader.ReadByte();
}
}
}
}
}
}