TAJO-2179: Add a regular expression scanner and appender.
Closes #1046
diff --git a/CHANGES b/CHANGES
index 956e0d4..536c728 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,6 +4,8 @@
NEW FEATURES
+ TAJO-2179: Add a regular expression scanner and appender. (jinho)
+
TAJO-2165: Add 'ALTER TABLE UNSET PROPERTY' statement to Tajo DDL.
(Lee Dongjin via jihoon)
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 7d844fa..055581b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -742,14 +742,16 @@
if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT)) {
options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
options.set(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT);
- } else if (dataFormat.equalsIgnoreCase("JSON")) {
- options.set(StorageConstants.TEXT_SERDE_CLASS, "org.apache.tajo.storage.json.JsonLineSerDe");
- } else if (dataFormat.equalsIgnoreCase("RCFILE")) {
+ } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.JSON)) {
+ options.set(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_JSON_SERDE_CLASS);
+ } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.REGEX)) {
+ options.set(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_REGEX_SERDE_CLASS);
+ } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) {
options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
- } else if (dataFormat.equalsIgnoreCase("SEQUENCEFILE")) {
+ } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
- } else if (dataFormat.equalsIgnoreCase("PARQUET")) {
+ } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.PARQUET)) {
options.set(BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE);
options.set(PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE);
options.set(COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME);
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 755b5d0..987c0bb 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -233,6 +234,12 @@
options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
}
+ } else if (BuiltinStorages.REGEX.equals(dataFormat)) {
+ options.set(StorageConstants.TEXT_REGEX, properties.getProperty(RegexSerDe.INPUT_REGEX));
+ options.set(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE,
+ properties.getProperty(RegexSerDe.INPUT_REGEX_CASE_SENSITIVE));
+ options.set(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING,
+ properties.getProperty("output.format.string"));
}
// set data size
@@ -574,6 +581,25 @@
table.putToParameters(OrcConf.COMPRESS.getAttribute(),
tableDesc.getMeta().getProperty(OrcConf.COMPRESS.getAttribute()));
}
+ } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.REGEX)) {
+
+ sd.setInputFormat(TextInputFormat.class.getName());
+ sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getName());
+ sd.getSerdeInfo().setSerializationLib(RegexSerDe.class.getName());
+
+ if (tableDesc.getMeta().containsProperty(StorageConstants.TEXT_NULL)) {
+ table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT,
+ StringEscapeUtils.unescapeJava(tableDesc.getMeta().getProperty(StorageConstants.TEXT_NULL)));
+ table.getParameters().remove(StorageConstants.TEXT_NULL);
+ }
+
+ sd.getSerdeInfo().putToParameters(RegexSerDe.INPUT_REGEX,
+ tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX));
+ sd.getSerdeInfo().putToParameters(RegexSerDe.INPUT_REGEX_CASE_SENSITIVE,
+ tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE, "false"));
+ sd.getSerdeInfo().putToParameters("output.format.string",
+ tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING));
+
} else {
throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore");
}
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
index faefd28..9cb665e 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
@@ -142,6 +143,8 @@
return BuiltinStorages.AVRO;
} else if (OrcSerde.class.getName().equals(serde)) {
return BuiltinStorages.ORC;
+ } else if (RegexSerDe.class.getName().equals(serde)) {
+ return BuiltinStorages.REGEX;
} else {
throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
}
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index 9372bb0..6608bc0 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.hadoop.hive.serde2.RegexSerDe;
+import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.tajo.BuiltinStorages;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionDesc;
@@ -829,4 +831,35 @@
table1.getMeta().getProperty(StorageConstants.TEXT_DELIMITER));
store.dropTable(DB_NAME, tableName);
}
+
+ @Test
+ public void testTableUsingRegex() throws Exception {
+ TableMeta meta = new TableMeta(BuiltinStorages.REGEX, new KeyValueSet());
+ meta.putProperty(StorageConstants.TEXT_REGEX, "([^ ]*)");
+ meta.putProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, "%1$s");
+
+ org.apache.tajo.catalog.Schema schema = SchemaBuilder.builder()
+ .add("c_custkey", TajoDataTypes.Type.TEXT)
+ .build();
+
+ TableDesc table = new TableDesc(IdentifierUtil.buildFQName(DB_NAME, CUSTOMER), schema, meta,
+ new Path(warehousePath, new Path(DB_NAME, CUSTOMER)).toUri());
+ store.createTable(table.getProto());
+ assertTrue(store.existTable(DB_NAME, CUSTOMER));
+
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER);
+ assertEquals(TextInputFormat.class.getName(), hiveTable.getSd().getInputFormat());
+ assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+ assertEquals(RegexSerDe.class.getName(), hiveTable.getSerializationLib());
+
+ TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
+ assertEquals(table.getName(), table1.getName());
+ assertEquals(table.getUri(), table1.getUri());
+ assertEquals(table.getSchema().size(), table1.getSchema().size());
+ for (int i = 0; i < table.getSchema().size(); i++) {
+ assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
+ }
+
+ store.dropTable(DB_NAME, CUSTOMER);
+ }
}
diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
index 180bd4f..910cd0d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java
@@ -21,6 +21,7 @@
public class BuiltinStorages {
public static final String TEXT = "TEXT";
public static final String JSON = "JSON";
+ public static final String REGEX = "REGEX";
public static final String RAW = "RAW";
public static final String DRAW = "DRAW";
public static final String RCFILE = "RCFILE";
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index fc48baa..a1df29b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -50,6 +50,13 @@
public static final String TEXT_SKIP_HEADER_LINE = "text.skip.headerlines";
+ public static final String DEFAULT_JSON_SERDE_CLASS = "org.apache.tajo.storage.json.JsonLineSerDe";
+ public static final String DEFAULT_REGEX_SERDE_CLASS = "org.apache.tajo.storage.regex.RegexLineSerDe";
+
+ public static final String TEXT_REGEX = "text.regex";
+ public static final String TEXT_REGEX_CASE_INSENSITIVE = "text.regex.case.insensitive";
+ public static final String TEXT_REGEX_OUTPUT_FORMAT_STRING = "text.regex.output.format.string";
+
/**
* It's the maximum number of parsing error torrence.
*
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 976fb24..7fd57cd 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
+ <value>text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
</property>
<!--- Fragment Class Configurations -->
@@ -96,6 +96,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.regex.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
@@ -153,7 +158,7 @@
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
- <value>text,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
+ <value>text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
</property>
<property>
@@ -167,6 +172,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.regex.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
</property>
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java
new file mode 100644
index 0000000..b425a2d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.regex;
+
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.CharsetUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.exception.InvalidTablePropertyException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.FieldSerializerDeserializer;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextFieldSerializerDeserializer;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
+import org.apache.tajo.storage.text.TextLineSerDe;
+
+import java.io.IOException;
+import java.nio.charset.CharsetDecoder;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexLineDeserializer extends TextLineDeserializer {
+ private static final Log LOG = LogFactory.getLog(RegexLineDeserializer.class);
+
+ private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
+ private FieldSerializerDeserializer fieldSerDer;
+ private ByteBuf nullChars;
+
+ private int[] targetColumnIndexes;
+ private String inputRegex;
+ private Pattern inputPattern;
+ // Number of rows not matching the regex
+ private long unmatchedRows = 0;
+ private long nextUnmatchedRows = 1;
+ // Number of rows that match the regex but have missing groups.
+ private long partialMatchedRows = 0;
+ private long nextPartialMatchedRows = 1;
+
+ public RegexLineDeserializer(Schema schema, TableMeta meta, Column[] projected) {
+ super(schema, meta);
+ targetColumnIndexes = PlannerUtil.getTargetIds(schema, projected);
+ }
+
+ @Override
+ public void init() {
+ fieldSerDer = new TextFieldSerializerDeserializer(meta);
+ fieldSerDer.init(schema);
+
+ // Read the configuration parameters
+ inputRegex = meta.getProperty(StorageConstants.TEXT_REGEX);
+ boolean inputRegexIgnoreCase = "true".equalsIgnoreCase(
+ meta.getProperty(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE, "false"));
+
+ // Parse the configuration parameters
+ if (inputRegex != null) {
+ inputPattern = Pattern.compile(inputRegex, Pattern.DOTALL
+ + (inputRegexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
+ } else {
+ throw new TajoRuntimeException(new InvalidTablePropertyException(StorageConstants.TEXT_REGEX,
+ "This table does not have serde property \"" + StorageConstants.TEXT_REGEX + "\"!"));
+ }
+
+ if (nullChars != null) {
+ nullChars.release();
+ }
+ nullChars = TextLineSerDe.getNullChars(meta);
+ }
+
+
+ @Override
+ public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
+
+ if (lineBuf == null || targetColumnIndexes.length == 0) {
+ return;
+ }
+
+ String line = decoder.decode(lineBuf.nioBuffer(lineBuf.readerIndex(), lineBuf.readableBytes())).toString();
+ int[] projection = targetColumnIndexes;
+
+ // Projection
+ int currentTarget = 0;
+ int currentIndex = 0;
+ Matcher m = inputPattern.matcher(line);
+
+ if (!m.matches()) {
+ unmatchedRows++;
+ if (unmatchedRows >= nextUnmatchedRows) {
+ nextUnmatchedRows *= 100;
+ // Report the row
+ LOG.warn("" + unmatchedRows + " unmatched rows are found: " + line);
+ }
+ } else {
+
+ int groupCount = m.groupCount();
+ int currentGroup = 1;
+ while (currentGroup <= groupCount) {
+
+ if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
+
+ try {
+ Datum datum = fieldSerDer.deserialize(
+ currentIndex, lineBuf.setIndex(m.start(currentGroup), m.end(currentGroup)), nullChars);
+
+ output.put(currentTarget, datum);
+ } catch (Exception e) {
+ partialMatchedRows++;
+ if (partialMatchedRows >= nextPartialMatchedRows) {
+ nextPartialMatchedRows *= 100;
+ // Report the row
+ LOG.warn("" + partialMatchedRows + " partially unmatched rows are found, "
+ + " cannot find group " + currentIndex + ": " + line);
+ }
+ output.put(currentTarget, NullDatum.get());
+ }
+ currentTarget++;
+ }
+
+ if (projection.length == currentTarget) {
+ break;
+ }
+
+ currentIndex++;
+ currentGroup++;
+ }
+ }
+
+ /* If a text row is less than table schema size, tuple should set to NullDatum */
+ if (projection.length > currentTarget) {
+ for (; currentTarget < projection.length; currentTarget++) {
+ output.put(currentTarget, NullDatum.get());
+ }
+ }
+ }
+
+ @Override
+ public void release() {
+ if (nullChars != null) {
+ nullChars.release();
+ nullChars = null;
+ }
+ }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java
new file mode 100644
index 0000000..cda97e0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.regex;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+
+/**
+ * This is an implementation copied from hive RegexSerDe
+ *
+ * RegexSerDe uses regular expression (regex) to serialize/deserialize.
+ *
+ * It can deserialize the data using regex and extracts groups as columns. It
+ * can also serialize the tuple using a format string.
+ *
+ * In deserialization stage, if a row does not match the regex, then all columns
+ * in the row will be NULL. If a row matches the regex but has less than
+ * expected groups, the missing groups will be NULL. If a row matches the regex
+ * but has more than expected groups, the additional groups are just ignored.
+ *
+ * In serialization stage, it uses java string formatter to format the columns
+ * into a row. If the output type of the column in a query is not a string, it
+ * will be automatically converted to String by tajo.
+ *
+ * For the format of the format String, please refer to
+ * {@link http://java.sun.com/j2se/1.5.0/docs/api/java/util/Formatter.html#syntax}
+ */
+public class RegexLineSerDe extends TextLineSerDe {
+
+ @Override
+ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column [] projected) {
+ return new RegexLineDeserializer(schema, meta, projected);
+ }
+
+ @Override
+ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+ return new RegexLineSerializer(schema, meta);
+ }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java
new file mode 100644
index 0000000..af6a8b2
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.regex;
+
+
+import io.netty.util.CharsetUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.exception.InvalidTablePropertyException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.TimeZone;
+
+public class RegexLineSerializer extends TextLineSerializer {
+ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ private String outputFormatString;
+ private TimeZone tableTimezone;
+ private int columnNum;
+ private String nullChars;
+
+ public RegexLineSerializer(Schema schema, TableMeta meta) {
+ super(schema, meta);
+ }
+
+ @Override
+ public void init() {
+ // Read the configuration parameters
+ outputFormatString = meta.getProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING);
+
+ if (outputFormatString == null) {
+ throw new TajoRuntimeException(new InvalidTablePropertyException(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING,
+ "Cannot write data into table because \"" + StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING + "\""
+ + " is not specified in serde properties of the table."));
+ }
+
+ tableTimezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE,
+ StorageUtil.TAJO_CONF.getSystemTimezone().getID()));
+ nullChars = new String(TextLineSerDe.getNullCharsAsBytes(meta), CharsetUtil.UTF_8);
+ columnNum = schema.size();
+ }
+
+ @Override
+ public int serialize(OutputStream out, Tuple input) throws IOException {
+
+ String[] values = new String[columnNum];
+
+ for (int i = 0; i < columnNum; i++) {
+ values[i] = convertToString(i, input, nullChars);
+ }
+
+ byte[] bytes = String.format(outputFormatString, values).getBytes(CharsetUtil.UTF_8);
+ out.write(bytes);
+ return bytes.length;
+ }
+
+
+ private String convertToString(int columnIndex, Tuple tuple, String nullChars)
+ throws IOException {
+
+ Column col = schema.getColumn(columnIndex);
+ TajoDataTypes.DataType dataType = col.getDataType();
+
+ if (tuple.isBlankOrNull(columnIndex)) {
+ switch (dataType.getType()) {
+ case CHAR:
+ case TEXT:
+ return nullChars;
+ default:
+ return StringUtils.EMPTY;
+ }
+ }
+
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ return tuple.getBool(columnIndex) ? "true" : "false";
+ case CHAR:
+ int size = dataType.getLength() - tuple.size(columnIndex);
+ if (size < 0) {
+ throw new ValueTooLongForTypeCharactersException(dataType.getLength());
+ }
+
+ return StringUtils.rightPad(tuple.getText(columnIndex), size, "");
+ case TEXT:
+ case BIT:
+ case INT2:
+ case INT4:
+ case INT8:
+ case FLOAT4:
+ case FLOAT8:
+ case DATE:
+ case INTERVAL:
+ case TIME:
+ return tuple.getText(columnIndex);
+ case TIMESTAMP:
+ // UTC to table timezone
+ return TimestampDatum.asChars(tuple.getTimeDate(columnIndex), tableTimezone, false);
+ case BLOB:
+ return Base64.encodeBase64String(tuple.getBytes(columnIndex));
+ case PROTOBUF:
+ ProtobufDatum protobuf = (ProtobufDatum) tuple.getProtobufDatum(columnIndex);
+ return protobufJsonFormat.printToString(protobuf.get());
+ case NULL_TYPE:
+ default:
+ return StringUtils.EMPTY;
+ }
+ }
+
+ @Override
+ public void release() {
+ }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java
new file mode 100644
index 0000000..a6b814f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.regex;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.BuiltinStorages;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaBuilder;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.schema.Field;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+
+import static org.apache.tajo.schema.QualifiedIdentifier.$;
+import static org.apache.tajo.type.Type.Text;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestRegexSerDe {
+ private Schema schema;
+ private Tuple[] rows;
+ private Path testDir;
+ private String apacheWeblogPattern;
+
+ @Before
+ public void setup() throws IOException {
+ apacheWeblogPattern = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") " +
+ "(-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?";
+
+ Field f1 = Field.Field($("host"), Text);
+ Field f2 = Field.Field($("identity"), Text);
+ Field f3 = Field.Field($("user"), Text);
+ Field f4 = Field.Field($("time"), Text);
+ Field f5 = Field.Field($("request"), Text);
+ Field f6 = Field.Field($("status"), Text);
+ Field f7 = Field.Field($("size"), Text);
+ Field f8 = Field.Field($("referer"), Text);
+ Field f9 = Field.Field($("agent"), Text);
+
+ schema = SchemaBuilder.builder().addAll2(
+ org.apache.tajo.schema.Schema.Schema(f1, f2, f3, f4, f5, f6, f7, f8, f9)).build();
+
+ rows = new VTuple[]{new VTuple(new Datum[]{
+ DatumFactory.createText("127.0.0.1"),
+ DatumFactory.createText("-"),
+ DatumFactory.createText("frank"),
+ DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"),
+ DatumFactory.createText("\"GET /apache_pb.gif HTTP/1.0\""),
+ DatumFactory.createText("200"),
+ DatumFactory.createText("2326"),
+ NullDatum.get(),
+ NullDatum.get(),
+ }), new VTuple(new Datum[]{
+ DatumFactory.createText("127.0.0.1"),
+ DatumFactory.createText("-"),
+ DatumFactory.createText("frank"),
+ DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"),
+ DatumFactory.createText("\"GET /apache_pb.gif HTTP/1.0\""),
+ DatumFactory.createText("200"),
+ DatumFactory.createText("2326"),
+ DatumFactory.createText("-"),
+ DatumFactory.createText("\"Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.19 " +
+ "(KHTML, like Gecko) Chrome/1.0.154.65 Safari/525.19\""),
+ })};
+
+ final String TEST_PATH = "target/test-data/TestRegexSerDe";
+ testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileSystem.getLocal(new Configuration()).delete(testDir, true);
+ }
+
+ public static Path getResourcePath(String path, String suffix) {
+ URL resultBaseURL = ClassLoader.getSystemResource(path);
+ return new Path(resultBaseURL.toString(), suffix);
+ }
+
+ @Test
+ public void testApacheAccessLogScanner() throws IOException {
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf);
+ Path tablePath = new Path(getResourcePath("dataset", "TestRegexSerDe"), "access.log");
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null);
+ meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern);
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ assertEquals(rows[0], tuple);
+
+ assertNotNull(tuple = scanner.next());
+ assertEquals(rows[1], tuple);
+
+ scanner.close();
+ }
+
+ @Test
+ public void testProjection() throws IOException {
+ Schema target = SchemaBuilder.builder()
+ .add("time", TajoDataTypes.Type.TEXT)
+ .add("status", TajoDataTypes.Type.TEXT)
+ .build();
+
+ Tuple[] rows = new VTuple[]{new VTuple(new Datum[]{
+ DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"),
+ DatumFactory.createText("200")
+ }), new VTuple(new Datum[]{
+ DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"),
+ DatumFactory.createText("200")
+ })};
+
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf);
+ Path tablePath = new Path(getResourcePath("dataset", "TestRegexSerDe"), "access.log");
+ FileSystem fs = FileSystem.getLocal(conf);
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target);
+ meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern);
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ assertEquals(2, tuple.size());
+ assertEquals(rows[0], tuple);
+
+ assertNotNull(tuple = scanner.next());
+ assertEquals(2, tuple.size());
+ assertEquals(rows[1], tuple);
+
+ scanner.close();
+ }
+
+ @Test
+ public void testSerializer() throws IOException {
+ TajoConf conf = new TajoConf();
+
+ TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf);
+ meta.putProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s");
+
+ FileTablespace sm = TablespaceManager.getLocalFs();
+ Path tablePath = new Path(testDir, "testSerializer.data");
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ appender.addTuple(rows[0]);
+ appender.addTuple(rows[1]);
+ appender.close();
+
+ FileStatus status = tablePath.getFileSystem(conf).getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+
+ meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf);
+ meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern);
+ Scanner scanner = sm.getScanner(meta, schema, fragment, null);
+ scanner.init();
+
+ Tuple tuple = scanner.next();
+ assertEquals(rows[0], tuple);
+ assertNotNull(tuple = scanner.next());
+ assertEquals(rows[1], tuple);
+ assertNull(scanner.next());
+ scanner.close();
+ }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log
new file mode 100644
index 0000000..1e9106f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log
@@ -0,0 +1,2 @@
+127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326
+127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 - "Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.19 (KHTML, like Gecko) Chrome/1.0.154.65 Safari/525.19"
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 7ae58aa..fdf9ebc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -34,7 +34,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro</value>
+ <value>text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro</value>
</property>
<!--- Fragment Class Configurations -->
@@ -75,6 +75,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.regex.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
</property>
@@ -117,7 +122,7 @@
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
- <value>text,raw,draw,rcfile,row,parquet,orc,sequencefile,avro</value>
+ <value>text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro</value>
</property>
<property>
@@ -131,6 +136,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.regex.class</name>
+ <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.raw.class</name>
<value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
</property>