TAJO-2013: FilterPushDownRule fails due to the unsupported index.
diff --git a/CHANGES b/CHANGES
index f7aba71..d55e4b8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,8 @@
 
   BUG FIXES
 
+    TAJO-2013: FilterPushDownRule fails due to the unsupported index. (jinho)
+
     TAJO-2014: TestRpcClientManager fails occasionally. (jinho)
 
     TAJO-2000: BSTIndex can cause OOM. (jinho)
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
index e44bba5..1ac29ef 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -24,6 +24,7 @@
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.datum.Datum;
@@ -57,7 +58,8 @@
   private CatalogService catalog;
 
   static class FilterPushDownContext {
-    Set<EvalNode> pushingDownFilters = TUtil.newHashSet();
+    Set<EvalNode> pushingDownFilters = new HashSet<>();
+    LogicalPlanRewriteRuleContext rewriteRuleContext;
 
     public void clear() {
       pushingDownFilters.clear();
@@ -111,6 +113,7 @@
         . It not, create new HavingNode and set parent's child.
      */
     FilterPushDownContext context = new FilterPushDownContext();
+    context.rewriteRuleContext = rewriteRuleContext;
     LogicalPlan plan = rewriteRuleContext.getPlan();
     catalog = rewriteRuleContext.getCatalog();
     for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
@@ -129,7 +132,7 @@
   @Override
   public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  SelectionNode selNode, Stack<LogicalNode> stack) throws TajoException {
-    context.pushingDownFilters.addAll(TUtil.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
+    context.pushingDownFilters.addAll(new HashSet<>(Arrays.asList(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual()))));
 
     stack.push(selNode);
     visit(context, plan, block, selNode.getChild(), stack);
@@ -147,7 +150,7 @@
     } else { // if there remain search conditions
 
       // check if it can be evaluated here
-      Set<EvalNode> matched = TUtil.newHashSet();
+      Set<EvalNode> matched = new HashSet<>();
       for (EvalNode eachEval : context.pushingDownFilters) {
         if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
           matched.add(eachEval);
@@ -168,9 +171,9 @@
   @Override
   public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                JoinNode joinNode, Stack<LogicalNode> stack) throws TajoException {
-    Set<EvalNode> onPredicates = TUtil.newHashSet();
+    Set<EvalNode> onPredicates = new HashSet<>();
     if (joinNode.hasJoinQual()) {
-      onPredicates.addAll(TUtil.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
+      onPredicates.addAll(new HashSet<>(Arrays.asList(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()))));
     }
     // clear join qual
     joinNode.clearJoinQual();
@@ -293,7 +296,7 @@
                                                            final Set<EvalNode> onPredicates,
                                                            final Set<EvalNode> wherePredicates)
       throws TajoException {
-    Set<EvalNode> nonPushableQuals = TUtil.newHashSet();
+    Set<EvalNode> nonPushableQuals = new HashSet<>();
     // TODO: non-equi theta join quals must not be pushed until TAJO-742 is resolved.
     nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(wherePredicates, block, joinNode));
     nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(onPredicates, block, joinNode));
@@ -340,8 +343,8 @@
                                                                 final Set<EvalNode> onPredicates,
                                                                 final Set<EvalNode> wherePredicates,
                                                                 final JoinNode joinNode) throws TajoException {
-    Set<String> nullSupplyingTableNameSet = TUtil.newHashSet();
-    Set<String> preservedTableNameSet = TUtil.newHashSet();
+    Set<String> nullSupplyingTableNameSet = new HashSet<>();
+    Set<String> preservedTableNameSet = new HashSet<>();
     String leftRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getLeftChild());
     String rightRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getRightChild());
 
@@ -359,7 +362,7 @@
       nullSupplyingTableNameSet.add(rightRelation);
     }
 
-    Set<EvalNode> nonPushableQuals = TUtil.newHashSet();
+    Set<EvalNode> nonPushableQuals = new HashSet<>();
     for (EvalNode eachQual : onPredicates) {
       for (String relName : preservedTableNameSet) {
         if (isEvalNeedRelation(eachQual, relName)) {
@@ -425,7 +428,7 @@
       LogicalPlan.QueryBlock block,
       LogicalNode node, LogicalNode childNode) throws TajoException {
     // transformed -> pushingDownFilters
-    Map<EvalNode, EvalNode> transformedMap = TUtil.newHashMap();
+    Map<EvalNode, EvalNode> transformedMap = new HashMap<>();
 
     if (originEvals.isEmpty()) {
       return transformedMap;
@@ -483,7 +486,7 @@
     }
 
     // node in column -> child out column
-    Map<String, String> columnMap = TUtil.newHashMap();
+    Map<String, String> columnMap = new HashMap<>();
 
     for (int i = 0; i < node.getInSchema().size(); i++) {
       String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
@@ -643,7 +646,7 @@
   }
 
   private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
-    Set<EvalNode> reversed = TUtil.newHashSet();
+    Set<EvalNode> reversed = new HashSet<>();
     for (EvalNode evalNode : remainFilters) {
       reversed.add(map.get(evalNode));
     }
@@ -655,7 +658,7 @@
       LogicalNode childNode, List<EvalNode> notMatched,
       Set<String> partitionColumns, int columnOffset) throws TajoException {
     // canonical name -> target
-    Map<String, Target> nodeTargetMap = TUtil.newHashMap();
+    Map<String, Target> nodeTargetMap = new HashMap<>();
     for (Target target : node.getTargets()) {
       nodeTargetMap.put(target.getCanonicalName(), target);
     }
@@ -780,8 +783,8 @@
                                        HavingNode havingNode,
                                        GroupbyNode groupByNode) throws TajoException {
     // find aggregation column
-    Set<Column> groupingColumns = TUtil.newHashSet(groupByNode.getGroupingColumns());
-    Set<String> aggrFunctionOutColumns = TUtil.newHashSet();
+    Set<Column> groupingColumns = new HashSet<>(Arrays.asList(groupByNode.getGroupingColumns()));
+    Set<String> aggrFunctionOutColumns = new HashSet<>();
     for (Column column : groupByNode.getOutSchema().getRootColumns()) {
       if (!groupingColumns.contains(column)) {
         aggrFunctionOutColumns.add(column.getQualifiedName());
@@ -896,7 +899,7 @@
     List<EvalNode> matched = TUtil.newList();
 
     // find partition column and check matching
-    Set<String> partitionColumns = TUtil.newHashSet();
+    Set<String> partitionColumns = new HashSet<>();
     TableDesc table = scanNode.getTableDesc();
     boolean hasQualifiedName = false;
     if (table.hasPartition()) {
@@ -905,7 +908,7 @@
         hasQualifiedName = c.hasQualifier();
       }
     }
-    Set<EvalNode> partitionEvals = TUtil.newHashSet();
+    Set<EvalNode> partitionEvals = new HashSet<>();
     for (EvalNode eval : context.pushingDownFilters) {
       if (table.hasPartition()) {
         Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
@@ -966,35 +969,37 @@
       scanNode.setQual(qual);
 
       // Index path can be identified only after filters are pushed into each scan.
-      String databaseName, tableName;
-      databaseName = CatalogUtil.extractQualifier(table.getName());
-      tableName = CatalogUtil.extractSimpleName(table.getName());
-      Set<Predicate> predicates = TUtil.newHashSet();
-      for (EvalNode eval : PlannerUtil.getAllEqualEvals(qual)) {
-        BinaryEval binaryEval = (BinaryEval) eval;
-        // TODO: consider more complex predicates
-        if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
-            binaryEval.getRightExpr().getType() == EvalType.CONST) {
-          predicates.add(new Predicate(binaryEval.getType(),
-              ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(),
-              ((ConstEval)binaryEval.getRightExpr()).getValue()));
-        } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST &&
-            binaryEval.getRightExpr().getType() == EvalType.FIELD) {
-          predicates.add(new Predicate(binaryEval.getType(),
-              ((FieldEval) binaryEval.getRightExpr()).getColumnRef(),
-              ((ConstEval)binaryEval.getLeftExpr()).getValue()));
+      if(context.rewriteRuleContext.getQueryContext().getBool(SessionVars.INDEX_ENABLED)) {
+        String databaseName, tableName;
+        databaseName = CatalogUtil.extractQualifier(table.getName());
+        tableName = CatalogUtil.extractSimpleName(table.getName());
+        Set<Predicate> predicates = new HashSet<>();
+        for (EvalNode eval : PlannerUtil.getAllEqualEvals(qual)) {
+          BinaryEval binaryEval = (BinaryEval) eval;
+          // TODO: consider more complex predicates
+          if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
+              binaryEval.getRightExpr().getType() == EvalType.CONST) {
+            predicates.add(new Predicate(binaryEval.getType(),
+                ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(),
+                ((ConstEval) binaryEval.getRightExpr()).getValue()));
+          } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST &&
+              binaryEval.getRightExpr().getType() == EvalType.FIELD) {
+            predicates.add(new Predicate(binaryEval.getType(),
+                ((FieldEval) binaryEval.getRightExpr()).getColumnRef(),
+                ((ConstEval) binaryEval.getLeftExpr()).getValue()));
+          }
         }
-      }
 
-      // for every subset of the set of columns, find all matched index paths
-      for (Set<Predicate> subset : Sets.powerSet(predicates)) {
-        if (subset.size() == 0)
-          continue;
-        Column[] columns = extractColumns(subset);
-        if (catalog.existIndexByColumns(databaseName, tableName, columns)) {
-          IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, tableName, columns);
-          block.addAccessPath(scanNode, new IndexScanInfo(
-              table.getStats(), indexDesc, getSimplePredicates(indexDesc, subset)));
+        // for every subset of the set of columns, find all matched index paths
+        for (Set<Predicate> subset : Sets.powerSet(predicates)) {
+          if (subset.size() == 0)
+            continue;
+          Column[] columns = extractColumns(subset);
+          if (catalog.existIndexByColumns(databaseName, tableName, columns)) {
+            IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, tableName, columns);
+            block.addAccessPath(scanNode, new IndexScanInfo(
+                table.getStats(), indexDesc, getSimplePredicates(indexDesc, subset)));
+          }
         }
       }
     }
@@ -1023,7 +1028,7 @@
 
   private static SimplePredicate[] getSimplePredicates(IndexDesc desc, Set<Predicate> predicates) {
     SimplePredicate[] simplePredicates = new SimplePredicate[predicates.size()];
-    Map<Column, Datum> colToValue = TUtil.newHashMap();
+    Map<Column, Datum> colToValue = new HashMap<>();
     for (Predicate predicate : predicates) {
       colToValue.put(predicate.column, predicate.value);
     }