diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9448443..53a13be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -46,9 +46,9 @@
 
 
 /**
- * Various SQLException Information. Including a vender-specific errorcode and a standard SQLState.
- * 
- * 
+ * Various SQLException Information. Including a vendor-specific errorcode and a standard SQLState.
+ *
+ *
  * @since 1.0
  */
 public enum SQLExceptionCode {
@@ -59,7 +59,7 @@
     IO_EXCEPTION(101, "08000", "Unexpected IO exception."),
     MALFORMED_CONNECTION_URL(102, "08001", "Malformed connection url."),
     CANNOT_ESTABLISH_CONNECTION(103, "08004", "Unable to establish connection."),
-    
+
     /**
      * Data Exception (errorcode 02, sqlstate 22)
      */
@@ -74,19 +74,17 @@
     VALUE_IN_UPSERT_NOT_CONSTANT(204, "22008", "Values in UPSERT must evaluate to a constant."),
     MALFORMED_URL(205, "22009", "Malformed URL."),
     DATA_EXCEEDS_MAX_CAPACITY(206, "22003", "The data exceeds the max capacity for the data type."),
-    MISSING_CHAR_LENGTH(207, "22003", "Missing length for CHAR."),
-    NONPOSITIVE_CHAR_LENGTH(208, "22003", "CHAR or VARCHAR must have a positive length."),
+    MISSING_MAX_LENGTH(207, "22004", "Max length must be specified for type."),
+    NONPOSITIVE_MAX_LENGTH(208, "22006", "Max length must have a positive length for type."),
     DECIMAL_PRECISION_OUT_OF_RANGE(209, "22003", "Decimal precision outside of range. Should be within 1 and " + PDataType.MAX_PRECISION + "."),
-    MISSING_BINARY_LENGTH(210, "22003", "Missing length for BINARY."),
-    NONPOSITIVE_BINARY_LENGTH(211, "22003", "BINARY must have a positive length."),
     SERVER_ARITHMETIC_ERROR(212, "22012", "Arithmetic error on server."),
     VALUE_OUTSIDE_RANGE(213,"22003","Value outside range."),
     VALUE_IN_LIST_NOT_CONSTANT(214, "22008", "Values in IN must evaluate to a constant."),
     SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS(215, "22015", "Single-row sub-query returns more than one row."),
     SUBQUERY_RETURNS_DIFFERENT_NUMBER_OF_FIELDS(216, "22016", "Sub-query must return the same number of fields as the left-hand-side expression of 'IN'."),
-    AMBIGUOUS_JOIN_CONDITION(217, "22017", "Amibiguous or non-equi join condition specified. Consider using table list with where clause."),
-    CONSTRAINT_VIOLATION(218, "22018", "Constraint violatioin."),
-    
+    AMBIGUOUS_JOIN_CONDITION(217, "22017", "Ambiguous or non-equi join condition specified. Consider using table list with where clause."),
+    CONSTRAINT_VIOLATION(218, "22018", "Constraint violation."),
+
     /**
      * Constraint Violation (errorcode 03, sqlstate 23)
      */
@@ -97,13 +95,13 @@
         }
     }),
     CANNOT_INDEX_COLUMN_ON_TYPE(302, "23100", "The column cannot be index due to its type."),
-    
+
     /**
      * Invalid Cursor State (errorcode 04, sqlstate 24)
      */
     CURSOR_BEFORE_FIRST_ROW(401, "24015","Cursor before first row."),
     CURSOR_PAST_LAST_ROW(402, "24016", "Cursor past last row."),
-    
+
     /**
      * Syntax Error or Access Rule Violation (errorcode 05, sqlstate 42)
      */
@@ -152,22 +150,22 @@
      *  Invalid Transaction State (errorcode 05, sqlstate 25)
      */
      READ_ONLY_CONNECTION(518,"25502","Mutations are not permitted for a read-only connection."),
- 
+
      VARBINARY_ARRAY_NOT_SUPPORTED(519, "42896", "VARBINARY ARRAY is not supported"),
-    
+
      /**
       *  Expression Index exceptions.
       */
-     AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggreagaate expression not allowed in an index"),
+     AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index"),
      NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index"),
      STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index"),
 
-     /** 
+     /**
       * Union All related errors
       */
      SELECT_COLUMN_NUM_IN_UNIONALL_DIFFS(525, "42902", "SELECT column number differs in a Union All query is not allowed"),
      SELECT_COLUMN_TYPE_IN_UNIONALL_DIFFS(526, "42903", "SELECT column types differ in a Union All query is not allowed"),
-     
+
      /**
       * Row timestamp column related errors
       */
@@ -177,10 +175,10 @@
      ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP"),
      ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views"),
      INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero"),
-     /** 
+     /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.
-     * 
+     *
      * For the following exceptions, use errorcode 10.
      */
     SINGLE_PK_MAY_NOT_BE_NULL(1000, "42I00", "Single column primary key may not be NULL."),
@@ -237,11 +235,11 @@
     NO_MUTABLE_INDEXES(1026, "42Y85", "Mutable secondary indexes are only supported for HBase version " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MUTABLE_SI_VERSION_THRESHOLD) + " and above."),
     INVALID_FILTER_ON_IMMUTABLE_ROWS(1027, "42Y86", "All columns referenced in a WHERE clause must be available in every index for a table with immutable rows."),
     INVALID_INDEX_STATE_TRANSITION(1028, "42Y87", "Invalid index state transition."),
-    INVALID_MUTABLE_INDEX_CONFIG(1029, "42Y88", "Mutable secondary indexes must have the " 
-            + IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY + " property set to " 
+    INVALID_MUTABLE_INDEX_CONFIG(1029, "42Y88", "Mutable secondary indexes must have the "
+            + IndexManagementUtil.WAL_EDIT_CODEC_CLASS_KEY + " property set to "
             +  IndexManagementUtil.INDEX_WAL_EDIT_CODEC_CLASS_NAME + " in the hbase-sites.xml of every region server"),
-            
-            
+
+
     CANNOT_CREATE_TENANT_SPECIFIC_TABLE(1030, "42Y89", "Cannot create table for tenant-specific connection"),
     DEFAULT_COLUMN_FAMILY_ONLY_ON_CREATE_TABLE(1034, "42Y93", "Default column family may only be specified when creating a table."),
     INSUFFICIENT_MULTI_TENANT_COLUMNS(1040, "42Y96", "A MULTI_TENANT table must have two or more PK columns with the first column being NOT NULL."),
@@ -255,8 +253,8 @@
     CANNOT_ALTER_PROPERTY(1051, "43A08", "Property can be specified or changed only when creating a table"),
     CANNOT_SET_PROPERTY_FOR_COLUMN_NOT_ADDED(1052, "43A09", "Property cannot be specified for a column family that is not being added or modified"),
     CANNOT_SET_TABLE_PROPERTY_ADD_COLUMN(1053, "43A10", "Table level property cannot be set when adding a column"),
-    
-    NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported for HBase versions " + 
+
+    NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported for HBase versions " +
         MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW) + " through " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW) + " inclusive."),
     UNALLOWED_LOCAL_INDEXES(1055, "43A12", "Local secondary indexes are configured to not be allowed."),
     DESC_VARBINARY_NOT_SUPPORTED(1056, "43A13", "Descending VARBINARY columns not supported"),
@@ -288,18 +286,18 @@
     SEQUENCE_VAL_REACHED_MAX_VALUE(1212, "42Z12", "Reached MAXVALUE of sequence"),
     SEQUENCE_VAL_REACHED_MIN_VALUE(1213, "42Z13", "Reached MINVALUE of sequence"),
     INCREMENT_BY_MUST_NOT_BE_ZERO(1214, "42Z14", "Sequence INCREMENT BY value cannot be zero"),
-    NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT(1215, "42Z15", "Sequence NEXT n VALUES FOR must be a postive integer or constant." ),
+    NUM_SEQ_TO_ALLOCATE_MUST_BE_CONSTANT(1215, "42Z15", "Sequence NEXT n VALUES FOR must be a positive integer or constant." ),
     NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED(1216, "42Z16", "Sequence NEXT n VALUES FOR is not supported for Sequences with the CYCLE flag" ),
-                    
+
     /** Parser error. (errorcode 06, sqlState 42P) */
-    PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
-    MISSING_TOKEN(602, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
-    UNWANTED_TOKEN(603, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
-    MISMATCHED_TOKEN(604, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
-    UNKNOWN_FUNCTION(605, "42P00", "Syntax error.", Factory.SYTAX_ERROR),
-    
+    PARSER_ERROR(601, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
+    MISSING_TOKEN(602, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
+    UNWANTED_TOKEN(603, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
+    MISMATCHED_TOKEN(604, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
+    UNKNOWN_FUNCTION(605, "42P00", "Syntax error.", Factory.SYNTAX_ERROR),
+
     /**
-     * Implementation defined class. Execution exceptions (errorcode 11, sqlstate XCL). 
+     * Implementation defined class. Execution exceptions (errorcode 11, sqlstate XCL).
      */
     RESULTSET_CLOSED(1101, "XCL01", "ResultSet is closed."),
     GET_TABLE_REGIONS_FAIL(1102, "XCL02", "Cannot get all table regions"),
@@ -316,7 +314,7 @@
     }),
     CANNOT_SPLIT_LOCAL_INDEX(1109,"XCL09", "Local index may not be pre-split"),
     CANNOT_SALT_LOCAL_INDEX(1110,"XCL10", "Local index may not be salted"),
-    
+
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).
      */
@@ -359,7 +357,7 @@
     private final Factory factory;
 
     private SQLExceptionCode(int errorCode, String sqlState, String message) {
-        this(errorCode, sqlState, message, Factory.DEFAULTY);
+        this(errorCode, sqlState, message, Factory.DEFAULT);
     }
 
     private SQLExceptionCode(int errorCode, String sqlState, String message, Factory factory) {
@@ -391,7 +389,7 @@
     }
 
     public static interface Factory {
-        public static final Factory DEFAULTY = new Factory() {
+        public static final Factory DEFAULT = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
@@ -399,7 +397,7 @@
             }
             
         };
-        public static final Factory SYTAX_ERROR = new Factory() {
+        public static final Factory SYNTAX_ERROR = new Factory() {
 
             @Override
             public SQLException newException(SQLExceptionInfo info) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
index ebee43b..278b4aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
@@ -22,12 +22,9 @@
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.types.PBinary;
-import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.base.Preconditions;
@@ -42,7 +39,7 @@
  */
 public class ColumnDef {
     private final ColumnName columnDefName;
-    private PDataType dataType;
+    private final PDataType dataType;
     private final Boolean isNull;
     private final Integer maxLength;
     private final Integer scale;
@@ -52,98 +49,91 @@
     private final Integer arrSize;
     private final String expressionStr;
     private final boolean isRowTimestamp;
- 
+
     ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, Integer arrSize, Boolean isNull, Integer maxLength,
-    		            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
-   	 try {
-         Preconditions.checkNotNull(sortOrder);
-   	     PDataType localType = null;
-         this.columnDefName = columnDefName;
-         this.isArray = isArray;
-         // TODO : Add correctness check for arrSize.  Should this be ignored as in postgres
-         // Also add what is the limit that we would support.  Are we going to support a
-         //  fixed size or like postgres allow infinite.  May be the data types max limit can 
-         // be used for the array size (May be too big)
-         if(this.isArray) {
-        	 localType = sqlTypeName == null ? null : PDataType.fromTypeId(PDataType.sqlArrayType(SchemaUtil.normalizeIdentifier(sqlTypeName)));
-        	 this.dataType = sqlTypeName == null ? null : PDataType.fromSqlTypeName(SchemaUtil.normalizeIdentifier(sqlTypeName));
-             this.arrSize = arrSize; // Can only be non negative based on parsing
-             if (this.dataType == PVarbinary.INSTANCE) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.VARBINARY_ARRAY_NOT_SUPPORTED)
-                 .setColumnName(columnDefName.getColumnName()).build().buildException();
-             }
-         } else {
-             this.dataType = sqlTypeName == null ? null : PDataType.fromSqlTypeName(SchemaUtil.normalizeIdentifier(sqlTypeName));
-             this.arrSize = null;
-         }
-         
-         this.isNull = isNull;
-         if (this.dataType == PChar.INSTANCE) {
-             if (maxLength == null) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_CHAR_LENGTH)
-                     .setColumnName(columnDefName.getColumnName()).build().buildException();
-             }
-             if (maxLength < 1) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.NONPOSITIVE_CHAR_LENGTH)
-                     .setColumnName(columnDefName.getColumnName()).build().buildException();
-             }
-             scale = null;
-         } else if (this.dataType == PVarchar.INSTANCE) {
-             if (maxLength != null && maxLength < 1) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.NONPOSITIVE_CHAR_LENGTH)
-                     .setColumnName(columnDefName.getColumnName()).build().buildException(); 
-             }
-             scale = null;
-         } else if (this.dataType == PDecimal.INSTANCE) {
-             // for deciaml, 1 <= maxLength <= PDataType.MAX_PRECISION;
-             if (maxLength != null) {
-                 if (maxLength < 1 || maxLength > PDataType.MAX_PRECISION) {
-                     throw new SQLExceptionInfo.Builder(SQLExceptionCode.DECIMAL_PRECISION_OUT_OF_RANGE)
-                         .setColumnName(columnDefName.getColumnName()).build().buildException();
-                 }
-                 // When a precision is specified and a scale is not specified, it is set to 0. 
-                 // 
-                 // This is the standard as specified in
-                 // http://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
-                 // and 
-                 // http://docs.oracle.com/javadb/10.6.2.1/ref/rrefsqlj15260.html.
-                 // Otherwise, if scale is bigger than maxLength, just set it to the maxLength;
-                 //
-                 // When neither a precision nor a scale is specified, the precision and scale is
-                 // ignored. All decimal are stored with as much decimal points as possible.
-                 scale = scale == null ? PDataType.DEFAULT_SCALE : scale > maxLength ? maxLength : scale; 
-             }
-         } else if (this.dataType == PBinary.INSTANCE) {
-             if (maxLength == null) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_BINARY_LENGTH)
-                     .setColumnName(columnDefName.getColumnName()).build().buildException();
-             }
-             if (maxLength < 1) {
-                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.NONPOSITIVE_BINARY_LENGTH)
-                     .setColumnName(columnDefName.getColumnName()).build().buildException();
-             }
-             scale = null;
-         } else {
-             // ignore maxLength and scale for other types.
-             maxLength = null;
-             scale = null;
-         }
-         this.maxLength = maxLength;
-         this.scale = scale;
-         this.isPK = isPK;
-         this.sortOrder = sortOrder;
-         if(this.isArray) {
-             this.dataType = localType;
-         }
-         this.expressionStr = expressionStr;
-         this.isRowTimestamp = isRowTimestamp;
-     } catch (SQLException e) {
-         throw new ParseException(e);
-     }
+            Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
+        try {
+            Preconditions.checkNotNull(sortOrder);
+            PDataType baseType;
+            PDataType dataType;
+            this.columnDefName = columnDefName;
+            // TODO : Add correctness check for arrSize.  Should this be ignored as in postgres
+            // Also add what is the limit that we would support.  Are we going to support a
+            //  fixed size or like postgres allow infinite.  May be the data types max limit can 
+            // be used for the array size (May be too big)
+            if (isArray) {
+                this.isArray = true;
+                dataType = sqlTypeName == null ? null : PDataType.fromTypeId(PDataType.sqlArrayType(SchemaUtil.normalizeIdentifier(sqlTypeName)));
+                baseType = sqlTypeName == null ? null : PDataType.fromSqlTypeName(SchemaUtil.normalizeIdentifier(sqlTypeName));
+                this.arrSize = arrSize; // Can only be non negative based on parsing
+                if (baseType == PVarbinary.INSTANCE) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.VARBINARY_ARRAY_NOT_SUPPORTED)
+                    .setColumnName(columnDefName.getColumnName()).build().buildException();
+                }
+            } else {
+                baseType = dataType = sqlTypeName == null ? null : PDataType.fromSqlTypeName(SchemaUtil.normalizeIdentifier(sqlTypeName));
+                if (this.isArray = dataType != null && dataType.isArrayType()) {
+                    baseType = PDataType.arrayBaseType(dataType);
+                }
+                this.arrSize = null;
+            }
+
+            this.isNull = isNull;
+            if (baseType == PDecimal.INSTANCE) {
+                // for deciaml, 1 <= maxLength <= PDataType.MAX_PRECISION;
+                if (maxLength == null) {
+                    scale = null;
+                } else {
+                    if (maxLength < 1 || maxLength > PDataType.MAX_PRECISION) {
+                        throw new SQLExceptionInfo.Builder(SQLExceptionCode.DECIMAL_PRECISION_OUT_OF_RANGE)
+                        .setColumnName(columnDefName.getColumnName()).build().buildException();
+                    }
+                    // When a precision is specified and a scale is not specified, it is set to 0. 
+                    // 
+                    // This is the standard as specified in
+                    // http://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT1832
+                    // and 
+                    // http://docs.oracle.com/javadb/10.6.2.1/ref/rrefsqlj15260.html.
+                    // Otherwise, if scale is bigger than maxLength, just set it to the maxLength;
+                    //
+                    // When neither a precision nor a scale is specified, the precision and scale is
+                    // ignored. All decimal are stored with as much decimal points as possible.
+                    scale = scale == null ? PDataType.DEFAULT_SCALE : scale > maxLength ? maxLength : scale; 
+                }
+            } else {
+                if (maxLength != null && maxLength < 1) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.NONPOSITIVE_MAX_LENGTH)
+                    .setColumnName(columnDefName.getColumnName()).build().buildException();
+                }
+                scale = null;
+                if (baseType == null) {
+                    maxLength = null;
+                } else if (baseType.isFixedWidth()) {
+                    if (baseType.getByteSize() == null) {
+                        if (maxLength == null) {
+                            throw new SQLExceptionInfo.Builder(SQLExceptionCode.MISSING_MAX_LENGTH)
+                            .setColumnName(columnDefName.getColumnName()).build().buildException();
+                        }
+                    } else {
+                        maxLength = null;
+                    }
+                }
+            }
+            this.maxLength = maxLength;
+            this.scale = scale;
+            this.isPK = isPK;
+            this.sortOrder = sortOrder;
+            this.dataType = dataType;
+            this.expressionStr = expressionStr;
+            this.isRowTimestamp = isRowTimestamp;
+        } catch (SQLException e) {
+            throw new ParseException(e);
+        }
     }
+
     ColumnDef(ColumnName columnDefName, String sqlTypeName, Boolean isNull, Integer maxLength,
             Integer scale, boolean isPK, SortOrder sortOrder, String expressionStr, boolean isRowTimestamp) {
-    	this(columnDefName, sqlTypeName, false, 0, isNull, maxLength, scale, isPK, sortOrder, expressionStr, isRowTimestamp);
+        this(columnDefName, sqlTypeName, false, 0, isNull, maxLength, scale, isPK, sortOrder, expressionStr, isRowTimestamp);
     }
 
     public ColumnName getColumnDefName() {
@@ -175,45 +165,45 @@
     public boolean isPK() {
         return isPK;
     }
-    
+
     public SortOrder getSortOrder() {
-    	return sortOrder;
+        return sortOrder;
     }
-        
-	public boolean isArray() {
-		return isArray;
-	}
 
-	public Integer getArraySize() {
-		return arrSize;
-	}
+    public boolean isArray() {
+        return isArray;
+    }
 
-	public String getExpression() {
-		return expressionStr;
-	}
-	
-	public boolean isRowTimestamp() {
-	    return isRowTimestamp;
-	}
-	@Override
+    public Integer getArraySize() {
+        return arrSize;
+    }
+
+    public String getExpression() {
+        return expressionStr;
+    }
+
+    public boolean isRowTimestamp() {
+        return isRowTimestamp;
+    }
+    @Override
     public String toString() {
-	    StringBuilder buf = new StringBuilder(columnDefName.getColumnNode().toString());
-	    buf.append(' ');
+        StringBuilder buf = new StringBuilder(columnDefName.getColumnNode().toString());
+        buf.append(' ');
         buf.append(dataType.getSqlTypeName());
         if (maxLength != null) {
             buf.append('(');
             buf.append(maxLength);
             if (scale != null) {
-              buf.append(',');
-              buf.append(scale); // has both max length and scale. For ex- decimal(10,2)
+                buf.append(',');
+                buf.append(scale); // has both max length and scale. For ex- decimal(10,2)
             }       
             buf.append(')');
-       }
+        }
         if (isArray) {
             buf.append(' ');
             buf.append(PDataType.ARRAY_TYPE_SUFFIX);
             buf.append(' ');
         }
-	    return buf.toString();
-	}
-}
+        return buf.toString();
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
index cd51683..e528d3b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryOptimizerTest.java
@@ -38,6 +38,7 @@
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseConnectionlessQueryTest;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -390,6 +391,22 @@
         assertEquals("CLIENT PARALLEL 1-WAY SKIP SCAN ON 15 KEYS OVER INDEX_TEST_TABLE_INDEX_F ['1','1111'] - ['5','3333']", QueryUtil.getExplainPlan(rs));
     }
 
+    @Test
+    public void testCharArrayLength() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute(
+                "CREATE TABLE TEST.TEST (testInt INTEGER, testCharArray CHAR(3)[], testByteArray BINARY(7)[], " +
+                "CONSTRAINT test_pk PRIMARY KEY(testInt)) DEFAULT_COLUMN_FAMILY='T'");
+        conn.createStatement().execute("CREATE INDEX TEST_INDEX ON TEST.TEST (testInt) INCLUDE (testCharArray, testByteArray)");
+        PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+
+        QueryPlan plan = stmt.optimizeQuery("SELECT /*+ INDEX(TEST.TEST TEST_INDEX)*/ testCharArray,testByteArray FROM TEST.TEST");
+        List<PColumn> columns = plan.getTableRef().getTable().getColumns();
+        assertEquals(3, columns.size());
+        assertEquals(3, columns.get(1).getMaxLength().intValue());
+        assertEquals(7, columns.get(2).getMaxLength().intValue());
+    }
+
     private void testAssertQueryPlanDetails(boolean multitenant, boolean useIndex, boolean salted) throws Exception {
         String sql;
         PreparedStatement stmt;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
index 63c9e42..5363042 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java
@@ -376,41 +376,41 @@
 
     @Test
     public void testParseCreateTableInlinePrimaryKeyWithOrder() throws Exception {
-    	for (String order : new String[]{"asc", "desc"}) {
+        for (String order : new String[]{"asc", "desc"}) {
             String s = "create table core.entity_history_archive (id char(15) primary key ${o})".replace("${o}", order);
-    		CreateTableStatement stmt = (CreateTableStatement)new SQLParser((s)).parseStatement();
-    		List<ColumnDef> columnDefs = stmt.getColumnDefs();
-    		assertEquals(1, columnDefs.size());
-    		assertEquals(SortOrder.fromDDLValue(order), columnDefs.iterator().next().getSortOrder()); 
-    	}
+            CreateTableStatement stmt = (CreateTableStatement)new SQLParser((s)).parseStatement();
+            List<ColumnDef> columnDefs = stmt.getColumnDefs();
+            assertEquals(1, columnDefs.size());
+            assertEquals(SortOrder.fromDDLValue(order), columnDefs.iterator().next().getSortOrder()); 
+        }
     }
     
     @Test
     public void testParseCreateTableOrderWithoutPrimaryKeyFails() throws Exception {
-    	for (String order : new String[]{"asc", "desc"}) {
-    		String stmt = "create table core.entity_history_archive (id varchar(20) ${o})".replace("${o}", order);
-    		try {
-    			new SQLParser((stmt)).parseStatement();
-    			fail("Expected parse exception to be thrown");
-    		} catch (SQLException e) {
-    			String errorMsg = "ERROR 603 (42P00): Syntax error. Unexpected input. Expecting \"RPAREN\", got \"${o}\"".replace("${o}", order);
-    			assertTrue("Expected message to contain \"" + errorMsg + "\" but got \"" + e.getMessage() + "\"", e.getMessage().contains(errorMsg));
-    		}
-    	}
+        for (String order : new String[]{"asc", "desc"}) {
+            String stmt = "create table core.entity_history_archive (id varchar(20) ${o})".replace("${o}", order);
+            try {
+                new SQLParser((stmt)).parseStatement();
+                fail("Expected parse exception to be thrown");
+            } catch (SQLException e) {
+                String errorMsg = "ERROR 603 (42P00): Syntax error. Unexpected input. Expecting \"RPAREN\", got \"${o}\"".replace("${o}", order);
+                assertTrue("Expected message to contain \"" + errorMsg + "\" but got \"" + e.getMessage() + "\"", e.getMessage().contains(errorMsg));
+            }
+        }
     }
     
     @Test
     public void testParseCreateTablePrimaryKeyConstraintWithOrder() throws Exception {
-    	for (String order : new String[]{"asc", "desc"}) {
-    		String s = "create table core.entity_history_archive (id CHAR(15), name VARCHAR(150), constraint pk primary key (id ${o}, name ${o}))".replace("${o}", order);
-    		CreateTableStatement stmt = (CreateTableStatement)new SQLParser((s)).parseStatement();
-    		PrimaryKeyConstraint pkConstraint = stmt.getPrimaryKeyConstraint();
-    		List<Pair<ColumnName,SortOrder>> columns = pkConstraint.getColumnNames();
-    		assertEquals(2, columns.size());
-    		for (Pair<ColumnName,SortOrder> pair : columns) {
-    			assertEquals(SortOrder.fromDDLValue(order), pkConstraint.getColumnWithSortOrder(pair.getFirst()).getSecond());
-    		}    		
-    	}
+        for (String order : new String[]{"asc", "desc"}) {
+            String s = "create table core.entity_history_archive (id CHAR(15), name VARCHAR(150), constraint pk primary key (id ${o}, name ${o}))".replace("${o}", order);
+            CreateTableStatement stmt = (CreateTableStatement)new SQLParser((s)).parseStatement();
+            PrimaryKeyConstraint pkConstraint = stmt.getPrimaryKeyConstraint();
+            List<Pair<ColumnName,SortOrder>> columns = pkConstraint.getColumnNames();
+            assertEquals(2, columns.size());
+            for (Pair<ColumnName,SortOrder> pair : columns) {
+                assertEquals(SortOrder.fromDDLValue(order), pkConstraint.getColumnWithSortOrder(pair.getFirst()).getSecond());
+            }           
+        }
     }
 
     @Test
@@ -439,30 +439,31 @@
     }
 
     @Test
-	public void testCreateSequence() throws Exception {
-		String sql = ((
-				"create sequence foo.bar\n" + 
-						"start with 0\n"	+ 
-						"increment by 1\n"));
-		parseQuery(sql);
-	}
-	
-	@Test
-	public void testNextValueForSelect() throws Exception {
-		String sql = ((
-				"select next value for foo.bar \n" + 
-						"from core.custom_entity_data\n"));						
-		parseQuery(sql);
-	}
-	
-	@Test
+    public void testCreateSequence() throws Exception {
+        String sql = ((
+                "create sequence foo.bar\n" + 
+                        "start with 0\n"    + 
+                        "increment by 1\n"));
+        parseQuery(sql);
+    }
+    
+    @Test
+    public void testNextValueForSelect() throws Exception {
+        String sql = ((
+                "select next value for foo.bar \n" + 
+                        "from core.custom_entity_data\n"));                     
+        parseQuery(sql);
+    }
+    
+    @Test
     public void testNextValueForWhere() throws Exception {
         String sql = ((
                 "upsert into core.custom_entity_data\n" + 
                         "select next value for foo.bar from core.custom_entity_data\n"));                    
         parseQuery(sql);
     }
-	
+
+    @Test
     public void testBadCharDef() throws Exception {
         try {
             String sql = ("CREATE TABLE IF NOT EXISTS testBadVarcharDef" + 
@@ -470,7 +471,7 @@
             parseQuery(sql);
             fail("Should have caught bad char definition.");
         } catch (SQLException e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 208 (22003): CHAR or VARCHAR must have a positive length. columnName=COL"));
+            assertEquals(SQLExceptionCode.NONPOSITIVE_MAX_LENGTH.getErrorCode(), e.getErrorCode());
         }
         try {
             String sql = ("CREATE TABLE IF NOT EXISTS testBadVarcharDef" + 
@@ -478,7 +479,7 @@
             parseQuery(sql);
             fail("Should have caught bad char definition.");
         } catch (SQLException e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 207 (22003): Missing length for CHAR. columnName=COL"));
+            assertEquals(SQLExceptionCode.MISSING_MAX_LENGTH.getErrorCode(), e.getErrorCode());
         }
     }
 
@@ -490,7 +491,7 @@
             parseQuery(sql);
             fail("Should have caught bad varchar definition.");
         } catch (SQLException e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 208 (22003): CHAR or VARCHAR must have a positive length. columnName=COL"));
+            assertEquals(SQLExceptionCode.NONPOSITIVE_MAX_LENGTH.getErrorCode(), e.getErrorCode());
         }
     }
 
@@ -522,7 +523,7 @@
             parseQuery(sql);
             fail("Should have caught bad binary definition.");
         } catch (SQLException e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 211 (22003): BINARY must have a positive length. columnName=COL"));
+            assertEquals(SQLExceptionCode.NONPOSITIVE_MAX_LENGTH.getErrorCode(), e.getErrorCode());
         }
         try {
             String sql = ("CREATE TABLE IF NOT EXISTS testBadVarcharDef" + 
@@ -530,7 +531,7 @@
             parseQuery(sql);
             fail("Should have caught bad char definition.");
         } catch (SQLException e) {
-            assertTrue(e.getMessage(), e.getMessage().contains("ERROR 210 (22003): Missing length for BINARY. columnName=COL"));
+            assertEquals(SQLExceptionCode.MISSING_MAX_LENGTH.getErrorCode(), e.getErrorCode());
         }
     }
 
