blob: 4971269edc13a402b550d4ca45ef0c15a7542ba3 [file] [log] [blame]
package org.apache.hawq.pxf.plugins.jdbc;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.hawq.pxf.api.Fragment;
import org.apache.hawq.pxf.api.Fragmenter;
import org.apache.hawq.pxf.api.FragmentsStats;
import org.apache.hawq.pxf.api.UserDataException;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.plugins.jdbc.utils.ByteUtil;
import org.apache.hawq.pxf.plugins.jdbc.utils.DbProduct;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
* JDBC fragmenter
*
* Splits the query to allow multiple simultaneous SELECTs
*/
public class JdbcPartitionFragmenter extends Fragmenter {
/**
* Insert fragment constraints into the SQL query.
*
* @param inputData InputData of the fragment
* @param dbName Database name (affects the behaviour for DATE partitions)
* @param query SQL query to insert constraints to. The query may may contain other WHERE statements
*/
public static void buildFragmenterSql(InputData inputData, String dbName, StringBuilder query) {
if (inputData.getUserProperty("PARTITION_BY") == null) {
return;
}
byte[] meta = inputData.getFragmentMetadata();
if (meta == null) {
return;
}
String[] partitionBy = inputData.getUserProperty("PARTITION_BY").split(":");
String partitionColumn = partitionBy[0];
PartitionType partitionType = PartitionType.typeOf(partitionBy[1]);
DbProduct dbProduct = DbProduct.getDbProduct(dbName);
if (!query.toString().contains("WHERE")) {
query.append(" WHERE ");
}
else {
query.append(" AND ");
}
switch (partitionType) {
case DATE: {
byte[][] newb = ByteUtil.splitBytes(meta);
Date fragStart = new Date(ByteUtil.toLong(newb[0]));
Date fragEnd = new Date(ByteUtil.toLong(newb[1]));
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
query.append(partitionColumn).append(" >= ").append(dbProduct.wrapDate(df.format(fragStart)));
query.append(" AND ");
query.append(partitionColumn).append(" < ").append(dbProduct.wrapDate(df.format(fragEnd)));
break;
}
case INT: {
byte[][] newb = ByteUtil.splitBytes(meta);
long fragStart = ByteUtil.toLong(newb[0]);
long fragEnd = ByteUtil.toLong(newb[1]);
query.append(partitionColumn).append(" >= ").append(fragStart);
query.append(" AND ");
query.append(partitionColumn).append(" < ").append(fragEnd);
break;
}
case ENUM: {
query.append(partitionColumn).append(" = '").append(new String(meta)).append("'");
break;
}
}
}
/**
* Class constructor.
*
* @param inputData PXF InputData
* @throws UserDataException if the request parameter is malformed
*/
public JdbcPartitionFragmenter(InputData inputData) throws UserDataException {
super(inputData);
if (inputData.getUserProperty("PARTITION_BY") == null) {
return;
}
// PARTITION_BY
try {
partitionType = PartitionType.typeOf(
inputData.getUserProperty("PARTITION_BY").split(":")[1]
);
}
catch (IllegalArgumentException | ArrayIndexOutOfBoundsException ex) {
throw new UserDataException("The parameter 'PARTITION_BY' is invalid. The pattern is '<column_name>:date|int|enum'");
}
// RANGE
try {
String rangeStr = inputData.getUserProperty("RANGE");
if (rangeStr != null) {
range = rangeStr.split(":");
if (range.length == 1 && partitionType != PartitionType.ENUM) {
throw new UserDataException("The parameter 'RANGE' must specify ':<end_value>' for this PARTITION_TYPE");
}
}
else {
throw new UserDataException("The parameter 'RANGE' must be specified along with 'PARTITION_BY'");
}
if (partitionType == PartitionType.DATE) {
// Parse DATE partition type values
try {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
rangeDateStart = Calendar.getInstance();
rangeDateStart.setTime(df.parse(range[0]));
rangeDateEnd = Calendar.getInstance();
rangeDateEnd.setTime(df.parse(range[1]));
}
catch (ParseException e) {
throw new UserDataException("The parameter 'RANGE' has invalid date format. The correct format is 'yyyy-MM-dd'");
}
}
else if (partitionType == PartitionType.INT) {
// Parse INT partition type values
try {
rangeIntStart = Long.parseLong(range[0]);
rangeIntEnd = Long.parseLong(range[1]);
}
catch (NumberFormatException e) {
throw new UserDataException("The parameter 'RANGE' is invalid. Both range boundaries must be integers");
}
}
}
catch (IllegalArgumentException ex) {
throw new UserDataException("The parameter 'RANGE' is invalid. The pattern is '<start_value>[:<end_value>]'");
}
// INTERVAL
try {
String intervalStr = inputData.getUserProperty("INTERVAL");
if (intervalStr != null) {
String[] interval = intervalStr.split(":");
try {
intervalNum = Long.parseLong(interval[0]);
if (intervalNum < 1) {
throw new UserDataException("The '<interval_num>' in parameter 'INTERVAL' must be at least 1, but actual is " + intervalNum);
}
}
catch (NumberFormatException ex) {
throw new UserDataException("The '<interval_num>' in parameter 'INTERVAL' must be an integer");
}
// Intervals of type DATE
if (interval.length > 1) {
intervalType = IntervalType.typeOf(interval[1]);
}
if (interval.length == 1 && partitionType == PartitionType.DATE) {
throw new UserDataException("The parameter 'INTERVAL' must specify unit (':year|month|day') for the PARTITION_TYPE = 'DATE'");
}
}
else if (partitionType != PartitionType.ENUM) {
throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY' for this PARTITION_TYPE");
}
}
catch (IllegalArgumentException ex) {
throw new UserDataException("The parameter 'INTERVAL' is invalid. The pattern is '<interval_num>[:<interval_unit>]'");
}
}
/**
* @throws UnsupportedOperationException ANALYZE for Jdbc plugin is not supported
*/
@Override
public FragmentsStats getFragmentsStats() throws UnsupportedOperationException {
throw new UnsupportedOperationException("ANALYZE for JDBC plugin is not supported");
}
/**
* getFragments() implementation
*
* @return a list of fragments to be passed to PXF segments
*/
@Override
public List<Fragment> getFragments() {
if (partitionType == null) {
// No partition case
Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, null);
fragments.add(fragment);
return fragments;
}
switch (partitionType) {
case DATE: {
Calendar fragStart = rangeDateStart;
while (fragStart.before(rangeDateEnd)) {
// Calculate a new fragment
Calendar fragEnd = (Calendar)fragStart.clone();
switch (intervalType) {
case DAY:
fragEnd.add(Calendar.DAY_OF_MONTH, (int)intervalNum);
break;
case MONTH:
fragEnd.add(Calendar.MONTH, (int)intervalNum);
break;
case YEAR:
fragEnd.add(Calendar.YEAR, (int)intervalNum);
break;
}
if (fragEnd.after(rangeDateEnd))
fragEnd = (Calendar)rangeDateEnd.clone();
// Convert to byte[]
byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis());
byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis());
byte[] fragmentMetadata = ByteUtil.mergeBytes(msStart, msEnd);
// Write fragment
Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
// Prepare for the next fragment
fragStart = fragEnd;
}
break;
}
case INT: {
long fragStart = rangeIntStart;
while (fragStart < rangeIntEnd) {
// Calculate a new fragment
long fragEnd = fragStart + intervalNum;
if (fragEnd > rangeIntEnd) {
fragEnd = rangeIntEnd;
}
// Convert to byte[]
byte[] bStart = ByteUtil.getBytes(fragStart);
byte[] bEnd = ByteUtil.getBytes(fragEnd);
byte[] fragmentMetadata = ByteUtil.mergeBytes(bStart, bEnd);
// Write fragment
Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
// Prepare for the next fragment
fragStart = fragEnd;
}
break;
}
case ENUM: {
for (String frag : range) {
byte[] fragmentMetadata = frag.getBytes();
Fragment fragment = new Fragment(inputData.getDataSource(), pxfHosts, fragmentMetadata);
fragments.add(fragment);
}
break;
}
}
return fragments;
}
// Partition parameters (filled by class constructor)
private String[] range = null;
private PartitionType partitionType = null;
private long intervalNum;
// Partition parameters for INT partitions (filled by class constructor)
private long rangeIntStart;
private long rangeIntEnd;
// Partition parameters for DATE partitions (filled by class constructor)
private IntervalType intervalType;
private Calendar rangeDateStart;
private Calendar rangeDateEnd;
private static enum PartitionType {
DATE,
INT,
ENUM;
public static PartitionType typeOf(String str) {
return valueOf(str.toUpperCase());
}
}
private static enum IntervalType {
DAY,
MONTH,
YEAR;
public static IntervalType typeOf(String str) {
return valueOf(str.toUpperCase());
}
}
// A PXF engine to use as a host for fragments
private static final String[] pxfHosts;
static {
String[] localhost = {"localhost"};
try {
localhost[0] = InetAddress.getLocalHost().getHostAddress();
}
catch (UnknownHostException ex) {
// It is always possible to get 'localhost' address
}
pxfHosts = localhost;
}
}