| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.cascading; |
| |
| import cascading.flow.Flow; |
| import cascading.flow.FlowProcess; |
| import cascading.flow.hadoop.HadoopFlowConnector; |
| import cascading.operation.BaseOperation; |
| import cascading.operation.Function; |
| import cascading.operation.FunctionCall; |
| import cascading.pipe.Each; |
| import cascading.pipe.Pipe; |
| import cascading.scheme.Scheme; |
| import cascading.scheme.hadoop.TextLine; |
| import cascading.tap.Tap; |
| import cascading.tap.hadoop.Hfs; |
| import cascading.tuple.Fields; |
| import cascading.tuple.Tuple; |
| import cascading.tuple.TupleEntry; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.thrift.protocol.TCompactProtocol; |
| import org.apache.thrift.protocol.TProtocol; |
| import org.apache.thrift.protocol.TProtocolFactory; |
| import org.apache.thrift.transport.TIOStreamTransport; |
| import org.junit.Test; |
| import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter; |
| import org.apache.parquet.hadoop.util.ContextUtil; |
| import org.apache.parquet.thrift.test.Name; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| public class TestParquetTupleScheme { |
| final String parquetInputPath = "target/test/ParquetTupleIn/names-parquet-in"; |
| final String txtOutputPath = "target/test/ParquetTupleOut/names-txt-out"; |
| |
| @Test |
| public void testReadPattern() throws Exception { |
| String sourceFolder = parquetInputPath; |
| testReadWrite(sourceFolder); |
| |
| String sourceGlobPattern = parquetInputPath + "/*"; |
| testReadWrite(sourceGlobPattern); |
| |
| String multiLevelGlobPattern = "target/test/ParquetTupleIn/**/*"; |
| testReadWrite(multiLevelGlobPattern); |
| } |
| |
| @Test |
| public void testFieldProjection() throws Exception { |
| createFileForRead(); |
| |
| Path path = new Path(txtOutputPath); |
| final FileSystem fs = path.getFileSystem(new Configuration()); |
| if (fs.exists(path)) fs.delete(path, true); |
| |
| Scheme sourceScheme = new ParquetTupleScheme(new Fields("last_name")); |
| Tap source = new Hfs(sourceScheme, parquetInputPath); |
| |
| Scheme sinkScheme = new TextLine(new Fields("last_name")); |
| Tap sink = new Hfs(sinkScheme, txtOutputPath); |
| |
| Pipe assembly = new Pipe("namecp"); |
| assembly = new Each(assembly, new ProjectedTupleFunction()); |
| Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); |
| |
| flow.complete(); |
| String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); |
| assertEquals("Practice\nHope\nHorse\n", result); |
| } |
| |
| public void testReadWrite(String inputPath) throws Exception { |
| createFileForRead(); |
| |
| Path path = new Path(txtOutputPath); |
| final FileSystem fs = path.getFileSystem(new Configuration()); |
| if (fs.exists(path)) fs.delete(path, true); |
| |
| Scheme sourceScheme = new ParquetTupleScheme(new Fields("first_name", "last_name")); |
| Tap source = new Hfs(sourceScheme, inputPath); |
| |
| Scheme sinkScheme = new TextLine(new Fields("first", "last")); |
| Tap sink = new Hfs(sinkScheme, txtOutputPath); |
| |
| Pipe assembly = new Pipe("namecp"); |
| assembly = new Each(assembly, new UnpackTupleFunction()); |
| Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); |
| |
| flow.complete(); |
| String result = FileUtils.readFileToString(new File(txtOutputPath + "/part-00000")); |
| assertEquals("Alice\tPractice\nBob\tHope\nCharlie\tHorse\n", result); |
| } |
| |
| private void createFileForRead() throws Exception { |
| final Path fileToCreate = new Path(parquetInputPath + "/names.parquet"); |
| |
| final Configuration conf = new Configuration(); |
| final FileSystem fs = fileToCreate.getFileSystem(conf); |
| if (fs.exists(fileToCreate)) fs.delete(fileToCreate, true); |
| |
| TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); |
| TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); |
| ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, Name.class); |
| |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); |
| |
| Name n1 = new Name(); |
| n1.setFirst_name("Alice"); |
| n1.setLast_name("Practice"); |
| Name n2 = new Name(); |
| n2.setFirst_name("Bob"); |
| n2.setLast_name("Hope"); |
| Name n3 = new Name(); |
| n3.setFirst_name("Charlie"); |
| n3.setLast_name("Horse"); |
| |
| n1.write(protocol); |
| w.write(new BytesWritable(baos.toByteArray())); |
| baos.reset(); |
| n2.write(protocol); |
| w.write(new BytesWritable(baos.toByteArray())); |
| baos.reset(); |
| n3.write(protocol); |
| w.write(new BytesWritable(baos.toByteArray())); |
| w.close(); |
| } |
| |
| private static class UnpackTupleFunction extends BaseOperation implements Function { |
| @Override |
| public void operate(FlowProcess flowProcess, FunctionCall functionCall) { |
| TupleEntry arguments = functionCall.getArguments(); |
| Tuple result = new Tuple(); |
| |
| Tuple name = new Tuple(); |
| name.addString(arguments.getString(0)); |
| name.addString(arguments.getString(1)); |
| |
| result.add(name); |
| functionCall.getOutputCollector().add(result); |
| } |
| } |
| |
| private static class ProjectedTupleFunction extends BaseOperation implements Function { |
| @Override |
| public void operate(FlowProcess flowProcess, FunctionCall functionCall) { |
| TupleEntry arguments = functionCall.getArguments(); |
| Tuple result = new Tuple(); |
| |
| Tuple name = new Tuple(); |
| name.addString(arguments.getString(0)); |
| // name.addString(arguments.getString(1)); |
| |
| result.add(name); |
| functionCall.getOutputCollector().add(result); |
| } |
| } |
| } |