blob: 34e3d6b10908b176bb808cb74cf1d7b33c342569 [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.lib.io.fs;
import java.io.*;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.datatorrent.api.AttributeMap;
import com.datatorrent.api.AttributeMap.Attribute;
import com.datatorrent.api.Context.Counters;
import com.datatorrent.api.Context.OperatorContext;
/**
* Functional Test for {@link HdfsExactlyOnceOutputOperator}
*/
public class HdfsExactlyOnceOutputOperatorTest
{
@Before
public void setup()
{
deleteFile("target/0");
deleteFile("target/1");
deleteFile("target/2");
}
@Test
@SuppressWarnings("unchecked")
public void testOperator()
{
HdfsExactlyOnceOutputOperator oper = new HdfsExactlyOnceOutputOperator();
oper.setFilePath("target");
oper.setup(new DummyContext(0));
oper.beginWindow(0);
oper.input.process("window 0");
oper.input.process("window 0");
oper.endWindow();
oper.beginWindow(1);
oper.input.process("window 1");
oper.teardown();
Assert.assertEquals("The number of lines in file target/0", 2, readFile("target/0", "window 0"));
Assert.assertEquals("The number of lines in file target/1.tmp", 1, readFile("target/1.tmp", "window 1"));
Assert.assertEquals("Checking the file target/1", false, checkFile("target/1"));
oper.setup(new DummyContext(0));
oper.beginWindow(0);
oper.input.process("window_new 0");
oper.input.process("window_new 0");
oper.endWindow();
oper.beginWindow(1);
oper.input.process("window_new 1");
oper.input.process("window_new 1");
oper.endWindow();
oper.beginWindow(2);
oper.input.process("window_new 2");
oper.input.process("window_new 2");
oper.endWindow();
oper.teardown();
Assert.assertEquals("The number of lines in file target/0", 2, readFile("target/0", "window 0"));
Assert.assertEquals("The number of lines in file target/1", 2, readFile("target/1", "window_new 1"));
Assert.assertEquals("The number of lines in file target/2", 2, readFile("target/2", "window_new 2"));
Assert.assertEquals("Checking the file target/0", true, checkFile("target/0"));
Assert.assertEquals("Checking the file target/1", true, checkFile("target/1"));
Assert.assertEquals("Checking the file target/2", true, checkFile("target/2"));
}
@After
public void tearDown()
{
deleteFile("target/0");
deleteFile("target/1");
deleteFile("target/2");
}
private int readFile(String path, final String val)
{
BufferedReader br = null;
try {
FileInputStream fstream = new FileInputStream(path);
DataInputStream in = new DataInputStream(fstream);
br = new BufferedReader(new InputStreamReader(in));
String strLine;
int count = 0;
while ((strLine = br.readLine()) != null) {
Assert.assertEquals("Comparing the values", val, strLine);
count++;
}
return count;
}
catch (Exception e) {
return -1;
}
finally {
if (br != null) {
try {
br.close();
}
catch (IOException e) {
}
}
}
}
private boolean checkFile(String path)
{
File file = new File(path);
if (file.exists()) {
return true;
}
return false;
}
private void deleteFile(String path)
{
File file = new File(path);
if (file.exists()) {
file.delete();
}
}
public static class DummyContext implements OperatorContext
{
int id;
String applicationPath;
String applicationId;
AttributeMap attributes;
public DummyContext(int id)
{
this.id = id;
}
public DummyContext(int id, @Nonnull AttributeMap map)
{
this.id = id;
this.attributes = map;
}
@Override
public int getId()
{
return id;
}
@Override
public void setCounters(Counters stats)
{
}
@Override
@SuppressWarnings("unchecked")
public <T> T getValue(Attribute<T> key)
{
T value = attributes.get(key);
if (value != null) {
return value;
}
return null;
}
@Override
public AttributeMap getAttributes()
{
return attributes;
}
}
}