change
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml index 9e585d4..ff9c435 100644 --- a/parquet-cascading/pom.xml +++ b/parquet-cascading/pom.xml
@@ -67,6 +67,18 @@ <artifactId>cascading-hadoop</artifactId> <version>2.2.0</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <version>1.7</version> + <scope>test</scope> + </dependency> </dependencies> <build>
diff --git a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java index 60ad35c..6c20ccb 100644 --- a/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java +++ b/parquet-cascading/src/test/java/parquet/cascading/TestParquetTBaseScheme.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.thrift.protocol.TCompactProtocol; @@ -50,9 +53,11 @@ import java.io.File; import java.io.ByteArrayOutputStream; +import java.util.HashMap; +import java.util.Map; public class TestParquetTBaseScheme { - final String txtInputPath = "src/test/resources/names.txt"; + final String txtInputPath = "parquet-cascading/src/test/resources/names.txt"; final String parquetInputPath = "target/test/ParquetTBaseScheme/names-parquet-in"; final String parquetOutputPath = "target/test/ParquetTBaseScheme/names-parquet-out"; final String txtOutputPath = "target/test/ParquetTBaseScheme/names-txt-out"; @@ -71,8 +76,14 @@ Pipe assembly = new Pipe( "namecp" ); assembly = new Each(assembly, new PackThriftFunction()); - Flow flow = new HadoopFlowConnector().connect("namecp", source, sink, assembly); + Map<Object, Object> props=new HashMap<Object, Object>(); +// props.put("mapred.output.compression.codec","org.apache.hadoop.io.compress.DefaultCodec"); + props.put("mapred.output.compress", true); + HadoopFlowConnector hadoopFlowConnector = new HadoopFlowConnector(props); + Flow flow = hadoopFlowConnector.connect("namecp", source, sink, assembly); + JobConf config = (JobConf) flow.getConfig(); +// FileOutputFormat.setCompressOutput(config,true); flow.complete(); }
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java index cb907e3..ae7af8b 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -63,7 +63,7 @@ codec = ParquetOutputFormat.getCompression(conf); } else if (getCompressOutput(conf)) { // from hadoop config // find the right codec - Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class); + Class<?> codecClass = getOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class); if (INFO) LOG.info("Compression set through hadoop codec: " + codecClass.getName()); codec = CompressionCodecName.fromCompressionCodec(codecClass); } else {
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java index 1ce4853..e6fc0be 100644 --- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java +++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftWriteSupport.java
@@ -23,6 +23,7 @@ import com.twitter.elephantbird.pig.util.ThriftToPig; +import parquet.Log; import parquet.hadoop.BadConfigurationException; import parquet.hadoop.api.WriteSupport; import parquet.io.ColumnIOFactory; @@ -39,6 +40,7 @@ public class ThriftWriteSupport<T extends TBase<?,?>> extends WriteSupport<T> { public static final String PARQUET_THRIFT_CLASS = "parquet.thrift.class"; + private static final Log LOG = Log.getLog(ThriftWriteSupport.class); public static <U extends TBase<?,?>> void setThriftClass(Configuration configuration, Class<U> thriftClass) { configuration.set(PARQUET_THRIFT_CLASS, thriftClass.getName()); @@ -84,10 +86,22 @@ this.schema = thriftSchemaConverter.convert(thriftClass); final Map<String, String> extraMetaData = new ThriftMetaData(thriftClass.getName(), thriftStruct).toExtraMetaData(); // adding the Pig schema as it would have been mapped from thrift - new PigMetaData(new ThriftToPig<S>(thriftClass).toSchema()).addToMetaData(extraMetaData); + if (isPigLoaded()){ + new PigMetaData(new ThriftToPig<S>(thriftClass).toSchema()).addToMetaData(extraMetaData); + } writeContext = new WriteContext(schema, extraMetaData); } + private boolean isPigLoaded() { + try { + Class.forName("org.apache.pig.impl.logicalLayer.schema.Schema"); + return true; + } catch (ClassNotFoundException e) { + LOG.info("Pig is not loaded, pig metadata will not be written"); + return false; + } + } + @Override public WriteContext init(Configuration configuration) { if (writeContext == null) { @@ -111,4 +125,5 @@ } } + }
diff --git a/parquet-thrift/src/test/java/parquet/thrift/projection/PathGlobPatternTest.java b/parquet-thrift/src/test/java/parquet/thrift/projection/PathGlobPatternTest.java index dd0f86b..f471206 100644 --- a/parquet-thrift/src/test/java/parquet/thrift/projection/PathGlobPatternTest.java +++ b/parquet-thrift/src/test/java/parquet/thrift/projection/PathGlobPatternTest.java
@@ -37,7 +37,9 @@ assertTrue(g.matches("a/asd/b")); assertTrue(g.matches("a/asd/ss/b")); - + g = new PathGlobPattern("**"); + assertTrue(g.matches("a/asd/b")); + assertTrue(g.matches("a/asd/ss/b")); } @Test