blob: 8919d8713f815edffd91d1b69ba113dd3852d7c5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage;
import com.google.common.base.Optional;
import net.minidev.json.JSONObject;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.OverridableConf;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* Tablespace manages the functions of storing and reading data.
* Tablespace is a abstract class.
* For supporting such as HDFS, HBASE, a specific Tablespace should be implemented by inheriting this class.
*
*/
public abstract class Tablespace {
protected final String name;
protected final URI uri;
protected final JSONObject config;
/** this space is visible or not. */
protected boolean visible = true;
protected TajoConf conf;
public Tablespace(String name, URI uri, JSONObject config) {
this.name = name;
this.uri = uri;
this.config = config;
}
public JSONObject getConfig() {
return config;
}
public void setVisible(boolean visible) {
this.visible = visible;
}
public Set<String> getDependencies() {
return Collections.emptySet();
}
/**
* Initialize storage manager.
* @throws java.io.IOException
*/
protected abstract void storageInit() throws IOException;
public String getName() {
return name;
}
public URI getUri() {
return uri;
}
public boolean isVisible() {
return visible;
}
public String toString() {
return name + "=" + uri.toString();
}
public abstract long getTableVolume(TableDesc table, Optional<EvalNode> filter) throws UnsupportedException;
/**
* if {@link StorageProperty#isArbitraryPathAllowed} is true,
* the storage allows arbitrary path accesses. In this case, the storage must provide the root URI.
*
* @see {@link StorageProperty#isArbitraryPathAllowed}
* @return Root URI
*/
public URI getRootUri() {
throw new TajoRuntimeException(new UnsupportedException(String.format("artibrary path '%s'", uri.toString())));
}
/**
* Get Table URI
*
* @param databaseName Database name
* @param tableName Table name
* @return Table URI
*/
public abstract URI getTableUri(String databaseName, String tableName);
/**
* Returns the splits that will serve as input for the scan tasks. The
* number of splits matches the number of regions in a table.
* @param inputSourceId Input source identifier, which can be either relation name or execution block id
* @param tableDesc The table description for the target data.
* @param filterCondition filter condition which can prune splits if possible
* @return The list of input fragments.
* @throws java.io.IOException
*/
public abstract List<Fragment> getSplits(String inputSourceId,
TableDesc tableDesc,
@Nullable EvalNode filterCondition) throws IOException, TajoException;
/**
* It returns the storage property.
* @return The storage property
*/
public abstract StorageProperty getProperty();
public abstract FormatProperty getFormatProperty(TableMeta meta);
/**
* Release storage manager resource
*/
public abstract void close();
/**
* It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
* In general Repartitioner determines the partition range using previous output statistics data.
* In the special cases, such as HBase Repartitioner uses the result of this method.
*
* @param queryContext The current query context which contains query properties.
* @param tableDesc The table description for the target data.
* @param inputSchema The input schema
* @param sortSpecs The sort specification that contains the sort column and sort order.
* @return The list of sort ranges.
* @throws java.io.IOException
*/
public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
Schema inputSchema, SortSpec [] sortSpecs,
TupleRange dataRange) throws IOException;
/**
* It is called when the query failed.
* Each storage manager should implement to be processed when the query fails in this method.
*
* @param node The child node of the root node.
* @throws java.io.IOException
*/
/**
* Initialize Tablespace instance. It should be called before using.
*
* @param tajoConf
* @throws java.io.IOException
*/
public void init(TajoConf tajoConf) throws IOException {
this.conf = new TajoConf(tajoConf);
storageInit();
}
/**
* Returns Scanner instance.
*
* @param meta The table meta
* @param schema The input schema
* @param fragment The fragment for scanning
* @param target The output schema
* @return Scanner instance
* @throws java.io.IOException
*/
public Scanner getScanner(TableMeta meta,
Schema schema,
Fragment fragment,
@Nullable Schema target) throws IOException {
if (target == null) {
target = schema;
}
if (fragment.isEmpty()) {
Scanner scanner = new NullScanner(conf, schema, meta, fragment);
scanner.setTarget(target.toArray());
return scanner;
}
Scanner scanner;
Class<? extends Scanner> scannerClass = getScannerClass(meta.getDataFormat());
scanner = OldStorageManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
scanner.setTarget(target.toArray());
return scanner;
}
public Appender getAppenderForInsertRow(OverridableConf queryContext,
TaskAttemptId taskAttemptId,
TableMeta meta,
Schema schema,
Path workDir) throws IOException {
return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
}
/**
* Returns Scanner instance.
*
* @param meta The table meta
* @param schema The input schema
* @param fragment The fragment for scanning
* @param target The output schema
* @return Scanner instance
* @throws IOException
*/
public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
Schema target) throws IOException {
return (SeekableScanner)this.getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
}
/**
* Returns Appender instance.
* @param queryContext Query property.
* @param taskAttemptId Task id.
* @param meta Table meta data.
* @param schema Output schema.
* @param workDir Working directory
* @return Appender instance
* @throws java.io.IOException
*/
public Appender getAppender(OverridableConf queryContext,
TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
throws IOException {
Appender appender;
Class<? extends Appender> appenderClass;
String handlerName = meta.getDataFormat().toLowerCase();
appenderClass = OldStorageManager.APPENDER_HANDLER_CACHE.get(handlerName);
if (appenderClass == null) {
appenderClass = conf.getClass(
String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
}
if (appenderClass == null) {
throw new IOException("Unknown Storage Type: " + meta.getDataFormat());
}
appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
return appender;
}
/**
* Return the Scanner class for the DataFormat that is defined in storage-default.xml.
*
* @param dataFormat store type
* @return The Scanner class
* @throws java.io.IOException
*/
public Class<? extends Scanner> getScannerClass(String dataFormat) throws IOException {
String handlerName = dataFormat.toLowerCase();
Class<? extends Scanner> scannerClass = OldStorageManager.SCANNER_HANDLER_CACHE.get(handlerName);
if (scannerClass == null) {
scannerClass = conf.getClass(
String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
OldStorageManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
}
if (scannerClass == null) {
throw new IOException("Unknown Storage Type: " + dataFormat);
}
return scannerClass;
}
/**
* It is called after making logical plan. Storage manager should verify the schema for inserting.
*
* @param tableDesc The table description of insert target.
* @param outSchema The output schema of select query for inserting.
* @throws java.io.IOException
*/
public abstract void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException;
/**
* Rewrite the logical plan. It is assumed that the final plan will be given in this method.
*/
public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoException {
// nothing to do by default
}
////////////////////////////////////////////////////////////////////////////
// Table Lifecycle Section
////////////////////////////////////////////////////////////////////////////
/**
* This method is called after executing "CREATE TABLE" statement.
* If a storage is a file based storage, a storage manager may create directory.
*
* @param tableDesc Table description which is created.
* @param ifNotExists Creates the table only when the table does not exist.
* @throws java.io.IOException
*/
public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException;
/**
* This method is called after executing "DROP TABLE" statement with the 'PURGE' option
* which is the option to delete all the data.
*
* @param tableDesc
* @throws java.io.IOException
*/
public abstract void purgeTable(TableDesc tableDesc) throws IOException, TajoException;
/**
* This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
* In general Tajo creates the target table after finishing the final sub-query of CATS.
* But In the special cases, such as HBase INSERT or CAST query uses the target table information.
* That kind of the storage should implements the logic related to creating table in this method.
*
* @param node The child node of the root node.
* @throws java.io.IOException
*/
public abstract void prepareTable(LogicalNode node) throws IOException, TajoException;
/**
* Finalizes result data. Tajo stores result data in the staging directory.
* If the query fails, clean up the staging directory.
* Otherwise the query is successful, move to the final directory from the staging directory.
*
* @param queryContext The query property
* @param finalEbId The final execution block id
* @param plan The query plan
* @param schema The final output schema
* @param tableDesc The description of the target table
* @return Saved path
* @throws java.io.IOException
*/
public abstract Path commitTable(OverridableConf queryContext,
ExecutionBlockId finalEbId,
LogicalPlan plan, Schema schema,
TableDesc tableDesc) throws IOException;
public abstract void rollbackTable(LogicalNode node) throws IOException, TajoException;
@Override
public boolean equals(Object obj) {
if (obj instanceof Tablespace) {
Tablespace other = (Tablespace) obj;
return name.equals(other.name) && uri.equals(other.uri);
} else {
return false;
}
}
public abstract URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException;
public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
TableMeta meta) throws IOException {
throw new IOException("Staging the output result is not supported in this storage");
}
public MetadataProvider getMetadataProvider() {
throw new TajoRuntimeException(new UnsupportedException("Linked Metadata Provider for " + name));
}
@SuppressWarnings("unused")
public int markAccetablePlanPart(LogicalPlan plan) {
throw new TajoRuntimeException(new UnsupportedException());
}
}