blob: 65b5a1848e98f6a20976fab4eb388e58e471593f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import junit.framework.TestCase;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
import org.apache.pig.StoreFunc;
import org.apache.pig.ReversibleLoadStoreFunc;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
public class TestReversibleLoadStore extends PigExecTestCase {
static List<Tuple> _storedTuples = new ArrayList<Tuple>();
public void testNoReuse() throws Exception {
DummyLoadFunc.readCounterMap = null ;
DummyStoreFunc.writeCounter = 0 ;
File tmpFile = createTempFile() ;
PigServer pig = new PigServer(ExecType.LOCAL);
pig.registerQuery("A = LOAD 'file:" + Util.encodeEscape(tmpFile.getAbsolutePath().toString()) + "' USING "
+ DummyLoadFunc.class.getName() + "();");
String file1 = "/tmp/testPigOutput" ;
if (pig.existsFile(file1)) {
pig.deleteFile(file1) ;
}
pig.store("A", file1, DummyStoreFunc.class.getName() + "()");
String file2 = "/tmp/testPigOutput2" ;
if (pig.existsFile(file2)) {
pig.deleteFile(file2) ;
}
pig.store("A", file2, DummyStoreFunc.class.getName() + "()");
// for this test the plan will not be reused so:-
// - initial temp file has to be read 10 times
// - DummyLoadStoreFunc has to be written 10 times
assertEquals(10, DummyLoadFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
assertEquals(10, DummyStoreFunc.writeCounter) ;
pig.deleteFile(file1) ;
pig.deleteFile(file2) ;
}
public void testReuse() throws Exception {
DummyLoadStoreFunc.readCounterMap = null ;
DummyLoadStoreFunc.writeCounter = 0 ;
File tmpFile = createTempFile() ;
PigServer pig = new PigServer(ExecType.LOCAL);
pig.registerQuery("A = LOAD 'file:" + Util.encodeEscape(tmpFile.getAbsolutePath().toString()) + "' USING "
+ DummyLoadStoreFunc.class.getName() + "();");
String file1 = "/tmp/testPigOutput" ;
if (pig.existsFile(file1)) {
pig.deleteFile(file1) ;
}
pig.store("A", file1, DummyLoadStoreFunc.class.getName() + "()");
String file2 = "/tmp/testPigOutput2" ;
if (pig.existsFile(file2)) {
pig.deleteFile(file2) ;
}
pig.store("A", file2, DummyLoadStoreFunc.class.getName() + "()");
// for this test the plan will be reused so:-
// - initial temp file has to be read 5 times
// - the output of the first execution has to be read 5 times
// - DummyLoadStoreFunc has to be written 10 times
assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("file:"+tmpFile.getAbsolutePath()).intValue()) ;
assertEquals(5, DummyLoadStoreFunc.readCounterMap.get("/tmp/testPigOutput").intValue()) ;
assertEquals(10, DummyLoadStoreFunc.writeCounter) ;
pig.deleteFile(file1) ;
pig.deleteFile(file2) ;
}
private File createTempFile() throws Exception {
File tmpFile = File.createTempFile("test", ".txt");
if (tmpFile.exists()) {
tmpFile.delete() ;
}
PrintWriter pw = new PrintWriter(tmpFile) ;
pw.println("1,11,111,1111") ;
pw.println("2,22,222,2222") ;
pw.println("3,33,333,3333") ;
pw.println("4,4,444,4444") ;
pw.println("5,55,555,5555") ;
pw.close() ;
tmpFile.deleteOnExit() ;
return tmpFile ;
}
public static class DummyLoadStoreFunc implements ReversibleLoadStoreFunc {
public static Map<String,Integer> readCounterMap = null ;
protected BufferedPositionedInputStream in = null;
private String fileName = null ;
public void bindTo(String inputfileName, BufferedPositionedInputStream is,
long offset, long end) throws IOException {
in = is ;
fileName = inputfileName ;
}
public Tuple getNext() throws IOException {
String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
if (line == null) {
return null ;
}
// else
if (readCounterMap == null) {
readCounterMap = new HashMap<String,Integer>() ;
}
if (readCounterMap.get(fileName) == null) {
readCounterMap.put(fileName, 1) ;
}
else {
readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
}
return new Tuple(line, ",");
}
public static int writeCounter = 0 ;
private PrintWriter pw = null ;
public void bindTo(OutputStream os) throws IOException {
pw = new PrintWriter(os) ;
}
public void finish() throws IOException {
pw.close() ;
}
public void putNext(Tuple tuple) throws IOException {
writeCounter++ ;
pw.println(tuple.toDelimitedString(","));
}
}
public static class DummyLoadFunc implements LoadFunc {
public static Map<String,Integer> readCounterMap = null ;
protected BufferedPositionedInputStream in = null;
private String fileName = null ;
public void bindTo(String inputfileName, BufferedPositionedInputStream is,
long offset, long end) throws IOException {
in = is ;
fileName = inputfileName ;
}
public Tuple getNext() throws IOException {
String line = in.readLine(Charset.forName("UTF8"), (byte) '\n') ;
if (line == null) {
return null ;
}
// else
if (readCounterMap == null) {
readCounterMap = new HashMap<String,Integer>() ;
}
if (readCounterMap.get(fileName) == null) {
readCounterMap.put(fileName, 1) ;
}
else {
readCounterMap.put(fileName, readCounterMap.get(fileName) + 1) ;
}
return new Tuple(line, ",");
}
}
public static class DummyStoreFunc implements StoreFunc {
public static int writeCounter = 0 ;
private PrintWriter pw = null ;
public void bindTo(OutputStream os) throws IOException {
pw = new PrintWriter(os) ;
}
public void finish() throws IOException {
pw.close() ;
}
public void putNext(Tuple tuple) throws IOException {
writeCounter++ ;
pw.println(tuple.toDelimitedString(","));
}
}
}