Added support for a auto_clear_optional_fields option.



git-svn-id: https://svn.apache.org/repos/asf/activemq/sandbox/activemq-protobuf@705251 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/activemq-protobuf-test/src/main/proto/deferred_decode.proto b/activemq-protobuf-test/src/main/proto/deferred_decode.proto
index 881f91d..2535c48 100644
--- a/activemq-protobuf-test/src/main/proto/deferred_decode.proto
+++ b/activemq-protobuf-test/src/main/proto/deferred_decode.proto
@@ -28,7 +28,9 @@
 
 
 message Bar {
- 
+  option base_type=Foo;
+
+  // These are the Foo fields.  
   optional int32 field1 =  1;
   optional int64 field2 =  2;
  
diff --git a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/BaseMessage.java b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/BaseMessage.java
index 1289d96..2e2753c 100644
--- a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/BaseMessage.java
+++ b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/BaseMessage.java
@@ -35,252 +35,327 @@
 
 abstract public class BaseMessage<T> implements Message<T> {
 
-    protected int memoizedSerializedSize = -1;
-    
-    abstract public T clone() throws CloneNotSupportedException;
+	protected int memoizedSerializedSize = -1;
+
+	abstract public T clone() throws CloneNotSupportedException;
 
 	public void clear() {
 		memoizedSerializedSize = -1;
 	}
 
-    ///////////////////////////////////////////////////////////////////
-    // Write related helpers.
-    ///////////////////////////////////////////////////////////////////
-
-    public void writeFramed(CodedOutputStream output) throws IOException {
-        output.writeRawVarint32(serializedSizeUnframed());
-        writeUnframed(output);
-    }
-
-    public byte[] toUnframedByteArray() {
-        try {
-            byte[] result = new byte[serializedSizeUnframed()];
-            CodedOutputStream output = CodedOutputStream.newInstance(result);
-            writeUnframed(output);
-            output.checkNoSpaceLeft();
-            return result;
-        } catch (IOException e) {
-            throw new RuntimeException("Serializing to a byte array threw an IOException " + "(should never happen).", e);
-        }
-    }
-    
-    
-	public byte[] toFramedByteArray() {
-        try {
-            byte[] result = new byte[serializedSizeFramed()];
-            CodedOutputStream output = CodedOutputStream.newInstance(result);
-            writeFramed(output);
-            output.checkNoSpaceLeft();
-            return result;
-        } catch (IOException e) {
-            throw new RuntimeException("Serializing to a byte array threw an IOException " + "(should never happen).", e);
-        }
-	}
-    
-	public void writeFramed(OutputStream output) throws IOException {
-        CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
-        writeFramed(codedOutput);
-        codedOutput.flush();
+	public boolean isInitialized() {
+		return missingFields().isEmpty();
 	}
 
-    public void writeUnframed(OutputStream output) throws IOException {
-        CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
-        writeUnframed(codedOutput);
-        codedOutput.flush();
-    }
-    
-    public int serializedSizeFramed() {
-        int t = serializedSizeUnframed();
-        return CodedOutputStream.computeRawVarint32Size(t) + t;
-
-    }
-
-    ///////////////////////////////////////////////////////////////////
-    // Read related helpers.
-    ///////////////////////////////////////////////////////////////////
-
-    public T mergeFramed(CodedInputStream input) throws IOException {
-        int length = input.readRawVarint32();
-        int oldLimit = input.pushLimit(length);
-        T rc=  mergeUnframed(input);
-        input.checkLastTagWas(0);
-        input.popLimit(oldLimit);
-        return rc;
-    }
-
-    public T mergeUnframed(ByteString data) throws InvalidProtocolBufferException {
-        try {
-            CodedInputStream input = data.newCodedInput();
-            mergeUnframed(input);
-            input.checkLastTagWas(0);
-            return (T)this;
-        } catch (InvalidProtocolBufferException e) {
-            throw e;
-        } catch (IOException e) {
-            throw new RuntimeException("Reading from a ByteString threw an IOException (should " + "never happen).", e);
-        }
-    }
-    
-    public T mergeFramed(ByteString data) throws InvalidProtocolBufferException {
-        try {
-            CodedInputStream input = data.newCodedInput();
-            mergeFramed(input);
-            input.checkLastTagWas(0);
-            return (T)this;
-        } catch (InvalidProtocolBufferException e) {
-            throw e;
-        } catch (IOException e) {
-            throw new RuntimeException("Reading from a ByteString threw an IOException (should " + "never happen).", e);
-        }
-    }
-
-    public T mergeUnframed(byte[] data) throws InvalidProtocolBufferException {
-        try {
-            CodedInputStream input = CodedInputStream.newInstance(data);
-            mergeUnframed(input);
-            input.checkLastTagWas(0);
-            return (T)this;
-        } catch (InvalidProtocolBufferException e) {
-            throw e;
-        } catch (IOException e) {
-            throw new RuntimeException("Reading from a byte array threw an IOException (should " + "never happen).", e);
-        }
-    }
-    
-    public T mergeFramed(byte[] data) throws InvalidProtocolBufferException {
-        try {
-            CodedInputStream input = CodedInputStream.newInstance(data);
-            mergeFramed(input);
-            input.checkLastTagWas(0);
-            return (T)this;
-        } catch (InvalidProtocolBufferException e) {
-            throw e;
-        } catch (IOException e) {
-            throw new RuntimeException("Reading from a byte array threw an IOException (should " + "never happen).", e);
-        }
-    }
-
-    public T mergeUnframed(InputStream input) throws IOException {
-        CodedInputStream codedInput = CodedInputStream.newInstance(input);
-        mergeUnframed(codedInput);
-        return (T)this;
-    }
-    
-    public T mergeFramed(InputStream input) throws IOException {
-		int length = readRawVarint32(input);
-		byte []data = new byte[length];
-		int pos = 0;
-		while( pos < length ) {
-			int r = input.read(data, pos, length-pos);
-			if( r < 0 ) {
-				throw new InvalidProtocolBufferException("Input stream ended before a full message frame could be read.");	
-			}
-			pos+=r;
+	@SuppressWarnings("unchecked")
+	public T assertInitialized() throws UninitializedMessageException {
+		java.util.ArrayList<String> missingFields = missingFields();
+		if (!missingFields.isEmpty()) {
+			throw new UninitializedMessageException(missingFields);
 		}
-		return mergeUnframed(data);
-    }
+		return (T) this;
+	}
 
-    ///////////////////////////////////////////////////////////////////
-    // Internal implementation methods.
-    ///////////////////////////////////////////////////////////////////
-    static protected <T> void addAll(Iterable<T> values, Collection<? super T> list) {
-        if (values instanceof Collection) {
-            @SuppressWarnings("unsafe")
-            Collection<T> collection = (Collection<T>)values;
-            list.addAll(collection);
-        } else {
-            for (T value : values) {
-                list.add(value);
-            }
-        }
-    }
-    
-    static protected void writeGroup(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
-        output.writeTag(tag, WIRETYPE_START_GROUP);
-        message.writeUnframed(output);
-        output.writeTag(tag, WIRETYPE_END_GROUP);
-    }
+	@SuppressWarnings("unchecked")
+	protected T checktInitialized() throws InvalidProtocolBufferException {
+		java.util.ArrayList<String> missingFields = missingFields();
+		if (!missingFields.isEmpty()) {
+			throw new UninitializedMessageException(missingFields)
+					.asInvalidProtocolBufferException();
+		}
+		return (T) this;
+	}
 
-    static protected <T extends BaseMessage> T readGroup(CodedInputStream input, int tag, T group) throws IOException {
-        group.mergeUnframed(input);
-        input.checkLastTagWas(makeTag(tag, WIRETYPE_END_GROUP));
-        return group;
-    }
-
-    static protected int computeGroupSize(int tag, BaseMessage message) {
-        return CodedOutputStream.computeTagSize(tag) * 2 + message.serializedSizeUnframed();
-    }
-
-
-    static protected void writeMessage(CodedOutputStream output, int tag, BaseMessage message) throws IOException {
-        output.writeTag(tag, WIRETYPE_LENGTH_DELIMITED);
-        message.writeFramed(output);
-    }
-    
-    static protected int computeMessageSize(int tag, BaseMessage message) {
-        return CodedOutputStream.computeTagSize(tag) + message.serializedSizeFramed();
-    }
-    
-    protected List<String> prefix(List<String> missingFields, String prefix) {
-        ArrayList<String> rc = new ArrayList<String>(missingFields.size());
-        for (String v : missingFields) {
-            rc.add(prefix+v);
-        }
-        return rc;
-    }
-
-    abstract protected T checktInitialized() throws InvalidProtocolBufferException;
-
-    /**
-     * Read a raw Varint from the stream.  If larger than 32 bits, discard the
-     * upper bits.
-     */
-    static protected int readRawVarint32(InputStream is) throws IOException {
-      byte tmp = readRawByte(is);
-      if (tmp >= 0) {
-        return tmp;
-      }
-      int result = tmp & 0x7f;
-      if ((tmp = readRawByte(is)) >= 0) {
-        result |= tmp << 7;
-      } else {
-        result |= (tmp & 0x7f) << 7;
-        if ((tmp = readRawByte(is)) >= 0) {
-          result |= tmp << 14;
-        } else {
-          result |= (tmp & 0x7f) << 14;
-          if ((tmp = readRawByte(is)) >= 0) {
-            result |= tmp << 21;
-          } else {
-            result |= (tmp & 0x7f) << 21;
-            result |= (tmp = readRawByte(is)) << 28;
-            if (tmp < 0) {
-              // Discard upper 32 bits.
-              for (int i = 0; i < 5; i++) {
-                if (readRawByte(is) >= 0) return result;
-              }
-              throw new InvalidProtocolBufferException(
-              "CodedInputStream encountered a malformed varint.");
-            }
-          }
-        }
-      }
-      return result;
-    }
-
-    static protected byte readRawByte(InputStream is) throws IOException {
-    	int rc = is.read();
-    	if( rc == -1 ) {
-	        throw new InvalidProtocolBufferException(
-	        	      "While parsing a protocol message, the input ended unexpectedly " +
-	        	      "in the middle of a field.  This could mean either than the " +
-	        	      "input has been truncated or that an embedded message " +
-	        	      "misreported its own length.");
-    	}
-    	return (byte) rc;
-    }
+	public ArrayList<String> missingFields() {
+		load();
+		return new ArrayList<String>();
+	}
 
 	protected void loadAndClear() {
-		memoizedSerializedSize=-1;
+		memoizedSerializedSize = -1;
 	}
+
+	protected void load() {
+	}
+
+    @SuppressWarnings("unchecked")
+	public T mergeFrom(T other) {
+    	return (T) this;
+    }
+    
+	
+     public void writeUnframed(com.google.protobuf.CodedOutputStream output) throws java.io.IOException {
+//        if (encodedForm == null) {
+//           encodedForm = new byte[serializedSizeUnframed()];
+//           com.google.protobuf.CodedOutputStream original = output;
+//           output = com.google.protobuf.CodedOutputStream.newInstance(encodedForm);
+//           if (hasField1()) {
+//              output.writeInt32(1, getField1());
+//           }
+//           if (hasField2()) {
+//              output.writeInt64(2, getField2());
+//           }
+//           if (hasField3()) {
+//              writeMessage(output, 3, getField3());
+//           }
+//           output.checkNoSpaceLeft();
+//           output = original;
+//        }
+//        output.writeRawBytes(encodedForm);
+     }
+
+	// /////////////////////////////////////////////////////////////////
+	// Write related helpers.
+	// /////////////////////////////////////////////////////////////////
+
+	public void writeFramed(CodedOutputStream output) throws IOException {
+		output.writeRawVarint32(serializedSizeUnframed());
+		writeUnframed(output);
+	}
+
+	public byte[] toUnframedByteArray() {
+		try {
+			byte[] result = new byte[serializedSizeUnframed()];
+			CodedOutputStream output = CodedOutputStream.newInstance(result);
+			writeUnframed(output);
+			output.checkNoSpaceLeft();
+			return result;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Serializing to a byte array threw an IOException "
+							+ "(should never happen).", e);
+		}
+	}
+
+	public byte[] toFramedByteArray() {
+		try {
+			byte[] result = new byte[serializedSizeFramed()];
+			CodedOutputStream output = CodedOutputStream.newInstance(result);
+			writeFramed(output);
+			output.checkNoSpaceLeft();
+			return result;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Serializing to a byte array threw an IOException "
+							+ "(should never happen).", e);
+		}
+	}
+
+	public void writeFramed(OutputStream output) throws IOException {
+		CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
+		writeFramed(codedOutput);
+		codedOutput.flush();
+	}
+
+	public void writeUnframed(OutputStream output) throws IOException {
+		CodedOutputStream codedOutput = CodedOutputStream.newInstance(output);
+		writeUnframed(codedOutput);
+		codedOutput.flush();
+	}
+
+	public int serializedSizeFramed() {
+		int t = serializedSizeUnframed();
+		return CodedOutputStream.computeRawVarint32Size(t) + t;
+
+	}
+
+	// /////////////////////////////////////////////////////////////////
+	// Read related helpers.
+	// /////////////////////////////////////////////////////////////////
+
+	public T mergeFramed(CodedInputStream input) throws IOException {
+		int length = input.readRawVarint32();
+		int oldLimit = input.pushLimit(length);
+		T rc = mergeUnframed(input);
+		input.checkLastTagWas(0);
+		input.popLimit(oldLimit);
+		return rc;
+	}
+
+	public T mergeUnframed(ByteString data)
+			throws InvalidProtocolBufferException {
+		try {
+			CodedInputStream input = data.newCodedInput();
+			mergeUnframed(input);
+			input.checkLastTagWas(0);
+			return (T) this;
+		} catch (InvalidProtocolBufferException e) {
+			throw e;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Reading from a ByteString threw an IOException (should "
+							+ "never happen).", e);
+		}
+	}
+
+	public T mergeFramed(ByteString data) throws InvalidProtocolBufferException {
+		try {
+			CodedInputStream input = data.newCodedInput();
+			mergeFramed(input);
+			input.checkLastTagWas(0);
+			return (T) this;
+		} catch (InvalidProtocolBufferException e) {
+			throw e;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Reading from a ByteString threw an IOException (should "
+							+ "never happen).", e);
+		}
+	}
+
+	public T mergeUnframed(byte[] data) throws InvalidProtocolBufferException {
+		try {
+			CodedInputStream input = CodedInputStream.newInstance(data);
+			mergeUnframed(input);
+			input.checkLastTagWas(0);
+			return (T) this;
+		} catch (InvalidProtocolBufferException e) {
+			throw e;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Reading from a byte array threw an IOException (should "
+							+ "never happen).", e);
+		}
+	}
+
+	public T mergeFramed(byte[] data) throws InvalidProtocolBufferException {
+		try {
+			CodedInputStream input = CodedInputStream.newInstance(data);
+			mergeFramed(input);
+			input.checkLastTagWas(0);
+			return (T) this;
+		} catch (InvalidProtocolBufferException e) {
+			throw e;
+		} catch (IOException e) {
+			throw new RuntimeException(
+					"Reading from a byte array threw an IOException (should "
+							+ "never happen).", e);
+		}
+	}
+
+	public T mergeUnframed(InputStream input) throws IOException {
+		CodedInputStream codedInput = CodedInputStream.newInstance(input);
+		mergeUnframed(codedInput);
+		return (T) this;
+	}
+
+	public T mergeFramed(InputStream input) throws IOException {
+		int length = readRawVarint32(input);
+		byte[] data = new byte[length];
+		int pos = 0;
+		while (pos < length) {
+			int r = input.read(data, pos, length - pos);
+			if (r < 0) {
+				throw new InvalidProtocolBufferException(
+						"Input stream ended before a full message frame could be read.");
+			}
+			pos += r;
+		}
+		return mergeUnframed(data);
+	}
+
+	// /////////////////////////////////////////////////////////////////
+	// Internal implementation methods.
+	// /////////////////////////////////////////////////////////////////
+	static protected <T> void addAll(Iterable<T> values,
+			Collection<? super T> list) {
+		if (values instanceof Collection) {
+			@SuppressWarnings("unsafe")
+			Collection<T> collection = (Collection<T>) values;
+			list.addAll(collection);
+		} else {
+			for (T value : values) {
+				list.add(value);
+			}
+		}
+	}
+
+	static protected void writeGroup(CodedOutputStream output, int tag,
+			BaseMessage message) throws IOException {
+		output.writeTag(tag, WIRETYPE_START_GROUP);
+		message.writeUnframed(output);
+		output.writeTag(tag, WIRETYPE_END_GROUP);
+	}
+
+	static protected <T extends BaseMessage> T readGroup(
+			CodedInputStream input, int tag, T group) throws IOException {
+		group.mergeUnframed(input);
+		input.checkLastTagWas(makeTag(tag, WIRETYPE_END_GROUP));
+		return group;
+	}
+
+	static protected int computeGroupSize(int tag, BaseMessage message) {
+		return CodedOutputStream.computeTagSize(tag) * 2
+				+ message.serializedSizeUnframed();
+	}
+
+	static protected void writeMessage(CodedOutputStream output, int tag,
+			BaseMessage message) throws IOException {
+		output.writeTag(tag, WIRETYPE_LENGTH_DELIMITED);
+		message.writeFramed(output);
+	}
+
+	static protected int computeMessageSize(int tag, BaseMessage message) {
+		return CodedOutputStream.computeTagSize(tag)
+				+ message.serializedSizeFramed();
+	}
+
+	protected List<String> prefix(List<String> missingFields, String prefix) {
+		ArrayList<String> rc = new ArrayList<String>(missingFields.size());
+		for (String v : missingFields) {
+			rc.add(prefix + v);
+		}
+		return rc;
+	}
+
+	/**
+	 * Read a raw Varint from the stream. If larger than 32 bits, discard the
+	 * upper bits.
+	 */
+	static public int readRawVarint32(InputStream is) throws IOException {
+		byte tmp = readRawByte(is);
+		if (tmp >= 0) {
+			return tmp;
+		}
+		int result = tmp & 0x7f;
+		if ((tmp = readRawByte(is)) >= 0) {
+			result |= tmp << 7;
+		} else {
+			result |= (tmp & 0x7f) << 7;
+			if ((tmp = readRawByte(is)) >= 0) {
+				result |= tmp << 14;
+			} else {
+				result |= (tmp & 0x7f) << 14;
+				if ((tmp = readRawByte(is)) >= 0) {
+					result |= tmp << 21;
+				} else {
+					result |= (tmp & 0x7f) << 21;
+					result |= (tmp = readRawByte(is)) << 28;
+					if (tmp < 0) {
+						// Discard upper 32 bits.
+						for (int i = 0; i < 5; i++) {
+							if (readRawByte(is) >= 0)
+								return result;
+						}
+						throw new InvalidProtocolBufferException(
+								"CodedInputStream encountered a malformed varint.");
+					}
+				}
+			}
+		}
+		return result;
+	}
+
+	static protected byte readRawByte(InputStream is) throws IOException {
+		int rc = is.read();
+		if (rc == -1) {
+			throw new InvalidProtocolBufferException(
+					"While parsing a protocol message, the input ended unexpectedly "
+							+ "in the middle of a field.  This could mean either than the "
+							+ "input has been truncated or that an embedded message "
+							+ "misreported its own length.");
+		}
+		return (byte) rc;
+	}
+
 }
diff --git a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/DeferredDecodeMessage.java b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/DeferredDecodeMessage.java
index beca6f5..f914acc 100644
--- a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/DeferredDecodeMessage.java
+++ b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/DeferredDecodeMessage.java
@@ -21,71 +21,76 @@
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 
-abstract public class DeferredDecodeMessage<T> extends BaseMessage<T>{
+abstract public class DeferredDecodeMessage<T> extends BaseMessage<T> {
 
-	protected byte[] encodedForm;
-	protected boolean decoded=true;
+    protected byte[] encodedForm;
+    protected boolean decoded = true;
 
-	@Override
+    @Override
     public T mergeFramed(CodedInputStream input) throws IOException {
         int length = input.readRawVarint32();
         int oldLimit = input.pushLimit(length);
-        T rc=  mergeUnframed(input.readRawBytes(length));
+        T rc = mergeUnframed(input.readRawBytes(length));
         input.popLimit(oldLimit);
         return rc;
     }
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public T mergeUnframed(byte[] data) throws InvalidProtocolBufferException {
-		encodedForm = data;
-		decoded=false;
-		return (T)this;
-	}
+    @SuppressWarnings("unchecked")
+    @Override
+    public T mergeUnframed(byte[] data) throws InvalidProtocolBufferException {
+        encodedForm = data;
+        decoded = false;
+        return (T) this;
+    }
 
-	@Override
-	public byte[] toUnframedByteArray() {
-		if( encodedForm==null ) {
-			encodedForm = super.toUnframedByteArray();
-		}
-		return encodedForm;
-	}
+    @Override
+    public byte[] toUnframedByteArray() {
+        if (encodedForm == null) {
+            encodedForm = super.toUnframedByteArray();
+        }
+        return encodedForm;
+    }
 
-	protected void load() {
-		if (!decoded) {
-			decoded = true;
-			try {
-				byte[] originalForm = encodedForm;
-				CodedInputStream input = CodedInputStream.newInstance(originalForm);
-				mergeUnframed(input);
-				input.checkLastTagWas(0);
-				// We need to reset the encoded form because the mergeUnframed from a stream clears it out.
-				encodedForm = originalForm;
-				checktInitialized();
-			} catch (Throwable e) {
-				throw new RuntimeException("Deferred message decoding failed: "+e.getMessage(), e);
-			}
-		}
-	}
-	
-	protected void loadAndClear() {
-		super.loadAndClear();
-		load();
-		encodedForm = null;
-	}
+    protected void load() {
+        if (!decoded) {
+            decoded = true;
+            try {
+                byte[] originalForm = encodedForm;
+                encodedForm=null;
+                CodedInputStream input = CodedInputStream.newInstance(originalForm);
+                mergeUnframed(input);
+                input.checkLastTagWas(0);
+                // We need to reset the encoded form because the mergeUnframed
+                // from a stream clears it out.
+                encodedForm = originalForm;
+                checktInitialized();
+            } catch (Throwable e) {
+                throw new RuntimeException("Deferred message decoding failed: " + e.getMessage(), e);
+            }
+        }
+    }
 
-	public void clear() {
-		super.clear();
-		encodedForm = null;
-		decoded = true;
-	}
+    protected void loadAndClear() {
+        super.loadAndClear();
+        load();
+//        if( encodedForm!=null ) {
+//            System.out.println("crap.");
+//        }
+        encodedForm = null;
+    }
 
-	public boolean isDecoded() {
-		return decoded;
-	}
-	
-	public boolean isEncoded() {
-		return encodedForm!=null;
-	}
+    public void clear() {
+        super.clear();
+        encodedForm = null;
+        decoded = true;
+    }
+
+    public boolean isDecoded() {
+        return decoded;
+    }
+
+    public boolean isEncoded() {
+        return encodedForm != null;
+    }
 
 }
diff --git a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/FieldDescriptor.java b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/FieldDescriptor.java
index 49d5158..3471c41 100644
--- a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/FieldDescriptor.java
+++ b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/FieldDescriptor.java
@@ -107,7 +107,6 @@
                 typeDescriptor = parent.getProtoDescriptor().getType(type);
             }
             if( typeDescriptor == null ) {
-                typeDescriptor = parent.getProtoDescriptor().getType(type);
                 errors.add("Field type not found: "+type);
             }
         }
diff --git a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/JavaGenerator.java b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/JavaGenerator.java
index 0482729..e407ac7 100644
--- a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/JavaGenerator.java
+++ b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/JavaGenerator.java
@@ -29,7 +29,6 @@
 import java.io.FileOutputStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -52,6 +51,7 @@
     private ArrayList<String> errors = new ArrayList<String>();
     private boolean multipleFiles;
 	private boolean deferredDecode;
+    private boolean auto_clear_optional_fields;
 
     public static void main(String[] args) {
         
@@ -124,7 +124,8 @@
 //        optimizeFor = getOption(proto.getOptions(), "optimize_for", "SPEED");
         multipleFiles = isMultipleFilesEnabled(proto);
 		deferredDecode = Boolean.parseBoolean(getOption(proto.getOptions(), "deferred_decode", "false"));
-        
+        auto_clear_optional_fields = Boolean.parseBoolean(getOption(proto.getOptions(), "auto_clear_optional_fields", "false"));
+		
         if( multipleFiles ) {
             generateProtoFile();
         } else {
@@ -271,9 +272,11 @@
         if( deferredDecode ) {
             baseClass = "org.apache.activemq.protobuf.DeferredDecodeMessage";
         }
+        if( m.getBaseType()!=null ) {
+            baseClass = javaType(m.getBaseType())+"Base";
+        }
         
-        
-        p("public "+staticOption+"final class " + className + " extends "+baseClass+"<" + className + "> "+implementsExpression+"{");
+        p(staticOption+"public final class " + className + " extends "+className+"Base<"+className+"> "+implementsExpression+"{");
         p();
 
         indent();
@@ -289,18 +292,14 @@
 
         // Generate the Group Messages
         for (FieldDescriptor field : m.getFields().values()) {
+        	if( isInBaseClass(m, field) ) {
+        		continue;
+        	}
             if( field.isGroup() ) {
                 generateMessageBean(field.getGroup());
             }
         }
 
-//        if( defferedUnmarshall ) {
-//        }
-
-        // Generate the field accessors..
-        for (FieldDescriptor field : m.getFields().values()) {
-            generateFieldAccessor(className, field);
-        }
         
         generateMethodAssertInitialized(m, className);
 
@@ -332,8 +331,30 @@
         unindent();
         p("}");
         p();
+        
+        p(staticOption+"abstract class " + className + "Base<T> extends "+baseClass+"<T> {");
+        p();
+        indent();
+        
+        // Generate the field accessors..
+        for (FieldDescriptor field : m.getFields().values()) {
+            if( isInBaseClass(m, field) ) {
+                continue;
+            }
+            generateFieldAccessor(field);
+        }
+        
+        unindent();
+        p("}");
+        p();
     }
 
+	private boolean isInBaseClass(MessageDescriptor m, FieldDescriptor field) {
+		if( m.getBaseType() ==null )
+			return false;
+		return m.getBaseType().getFields().containsKey(field.getName());
+	}
+
 	/**
      * If the java_visitor message option is set, then this method generates a visitor method.  The option 
      * speifiies the class name of the visitor and optionally the return value and exceptions thrown by the visitor.
@@ -997,7 +1018,7 @@
 	 * @param m
 	 */
     private void generateMethodClear(MessageDescriptor m) {
-        p("public final void clear() {");
+        p("public void clear() {");
         indent();
         p("super.clear();");
         for (FieldDescriptor field : m.getFields().values()) {
@@ -1011,47 +1032,9 @@
 
     private void generateMethodAssertInitialized(MessageDescriptor m, String className) {
         
-        
-        
-        p("public final boolean isInitialized() {");
+        p("public java.util.ArrayList<String> missingFields() {");
         indent();
-        p("return missingFields().isEmpty();");
-        unindent();
-        p("}");
-        p();
-        
-        p("public final "+className+" assertInitialized() throws org.apache.activemq.protobuf.UninitializedMessageException {");
-        indent();
-        p("java.util.ArrayList<String> missingFields = missingFields();");
-        p("if( !missingFields.isEmpty()) {");
-        indent();
-        p("throw new org.apache.activemq.protobuf.UninitializedMessageException(missingFields);");
-        unindent();
-        p("}");
-        p("return this;");
-        unindent();
-        p("}");
-        p();
-        
-        p("protected final "+className+" checktInitialized() throws com.google.protobuf.InvalidProtocolBufferException {");
-        indent();
-        p("java.util.ArrayList<String> missingFields = missingFields();");
-        p("if( !missingFields.isEmpty()) {");
-        indent();
-        p("throw new org.apache.activemq.protobuf.UninitializedMessageException(missingFields).asInvalidProtocolBufferException();");
-        unindent();
-        p("}");
-        p("return this;");
-        unindent();
-        p("}");
-        p();
-
-        p("public final java.util.ArrayList<String> missingFields() {");
-        indent();
-        if( deferredDecode ) {
-        	p("load();");
-        }        
-        p("java.util.ArrayList<String> missingFields = new java.util.ArrayList<String>();");
+        p("java.util.ArrayList<String> missingFields = super.missingFields();");
         
         for (FieldDescriptor field : m.getFields().values()) {
             String uname = uCamel(field.getName());
@@ -1135,7 +1118,7 @@
                 if( field.getTypeDescriptor()!=null && !field.getTypeDescriptor().isEnum()) {
                     p("sb.append(prefix+\""+field.getName()+"[\"+i+\"] {\\n\");");
                     p("l.get(i).toString(sb, prefix+\"  \");");
-                    p("sb.append(\"}\\n\");");
+                    p("sb.append(prefix+\"}\\n\");");
                 } else {
                     p("sb.append(prefix+\""+field.getName()+"[\"+i+\"]: \");");
                     p("sb.append(l.get(i));");
@@ -1147,7 +1130,7 @@
                 if( field.getTypeDescriptor()!=null && !field.getTypeDescriptor().isEnum()) {
                     p("sb.append(prefix+\""+field.getName()+" {\\n\");");
                     p("get" + uname + "().toString(sb, prefix+\"  \");");
-                    p("sb.append(\"}\\n\");");
+                    p("sb.append(prefix+\"}\\n\");");
                 } else {
                     p("sb.append(prefix+\""+field.getName()+": \");");
                     p("sb.append(get" + uname + "());");
@@ -1170,7 +1153,7 @@
      * @param field
      * @param className 
      */
-    private void generateFieldAccessor(String className, FieldDescriptor field) {
+    private void generateFieldAccessor(FieldDescriptor field) {
         
         String lname = lCamel(field.getName());
         String uname = uCamel(field.getName());
@@ -1212,11 +1195,11 @@
             p("}");
             p();
 
-            p("public "+className+" set" + uname + "List(java.util.List<" + type + "> " + lname + ") {");
+            p("public T set" + uname + "List(java.util.List<" + type + "> " + lname + ") {");
             indent();
           	p("loadAndClear();");
             p("this.f_" + lname + " = " + lname + ";");
-            p("return this;");
+            p("return (T)this;");
             unindent();
             p("}");
             p();
@@ -1251,29 +1234,29 @@
             p("}");
             p();
                             
-            p("public "+className+" set" + uname + "(int index, " + type + " value) {");
+            p("public T set" + uname + "(int index, " + type + " value) {");
             indent();
           	p("loadAndClear();");
             p("get" + uname + "List().set(index, value);");
-            p("return this;");
+            p("return (T)this;");
             unindent();
             p("}");
             p();
             
-            p("public "+className+" add" + uname + "(" + type + " value) {");
+            p("public T add" + uname + "(" + type + " value) {");
             indent();
           	p("loadAndClear();");
             p("get" + uname + "List().add(value);");
-            p("return this;");
+            p("return (T)this;");
             unindent();
             p("}");
             p();
             
-            p("public "+className+" addAll" + uname + "(java.lang.Iterable<? extends " + type + "> collection) {");
+            p("public T addAll" + uname + "(java.lang.Iterable<? extends " + type + "> collection) {");
             indent();
           	p("loadAndClear();");
             p("super.addAll(collection, get" + uname + "List());");
-            p("return this;");
+            p("return (T)this;");
             unindent();
             p("}");
             p();
@@ -1326,14 +1309,22 @@
             p("}");
             p();
 
-            p("public "+className+" set" + uname + "(" + type + " " + lname + ") {");
+            p("public T set" + uname + "(" + type + " " + lname + ") {");
             indent();
           	p("loadAndClear();");
             if (primitive) {
-                p("this.b_" + lname + " = true;");
+                if( auto_clear_optional_fields && field.isOptional() ) {
+                    if( field.isStringType() && !"null".equals(typeDefault)) {
+                        p("this.b_" + lname + " = ("+lname+" != "+typeDefault+");");
+                    } else {
+                        p("this.b_" + lname + " = ("+lname+" != "+typeDefault+");");
+                    }
+                } else {
+                    p("this.b_" + lname + " = true;");
+                }
             }
             p("this.f_" + lname + " = " + lname + ";");
-            p("return this;");
+            p("return (T)this;");
             unindent();
             p("}");
             p();
@@ -1496,7 +1487,7 @@
         
         String createMessage = getOption(ed.getOptions(), "java_create_message", null);        
         if( "true".equals(createMessage) ) {
-            p("public final org.apache.activemq.protobuf.Message createMessage() {");
+            p("public org.apache.activemq.protobuf.Message createMessage() {");
             indent();
             p("switch (this) {");
             indent();
@@ -1704,10 +1695,8 @@
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < name.length(); i++) {
             char c = name.charAt(i);
-            if( i!=0 ) {
-                if( Character.isUpperCase(c) ) {
-                    sb.append("_");
-                }
+            if( i!=0 && Character.isUpperCase(c) ) {
+                sb.append("_");
             }
             sb.append(Character.toUpperCase(c));
         }
diff --git a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/MessageDescriptor.java b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/MessageDescriptor.java
index 0ca3279..e512f48 100644
--- a/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/MessageDescriptor.java
+++ b/activemq-protobuf/src/main/java/org/apache/activemq/protobuf/compiler/MessageDescriptor.java
@@ -17,10 +17,13 @@
 package org.apache.activemq.protobuf.compiler;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.sun.org.apache.bcel.internal.generic.BASTORE;
+
 
 public class MessageDescriptor implements TypeDescriptor {
 
@@ -33,6 +36,7 @@
     private List<MessageDescriptor> extendsList = new ArrayList<MessageDescriptor>();
     private Map<String, OptionDescriptor> options = new LinkedHashMap<String, OptionDescriptor>();
     private final MessageDescriptor parent;
+	private MessageDescriptor baseType;
 
     public MessageDescriptor(ProtoDescriptor protoDescriptor, MessageDescriptor parent) {
         this.protoDescriptor = protoDescriptor;
@@ -40,6 +44,30 @@
     }
     
     public void validate(List<String> errors) {
+        String baseName = getOption(getOptions(), "base_type", null);
+        if( baseName!=null ) {
+            if( baseType==null ) {
+                baseType = (MessageDescriptor) getType(baseName);
+            }
+            if( baseType == null ) {
+                baseType = (MessageDescriptor) getProtoDescriptor().getType(baseName);
+            }
+            if( baseType == null ) {
+                errors.add("base_type option not valid, type not found: "+baseName);
+            }
+            
+            // Assert that all the fields in the base type are defined in this message defintion too.
+            HashSet<String> baseFieldNames = new HashSet<String>(baseType.getFields().keySet());
+            baseFieldNames.removeAll(getFields().keySet());
+            
+            // Some fields were not defined in the sub class..
+            if( !baseFieldNames.isEmpty() ) {
+            	for (String fieldName : baseFieldNames) {
+                    errors.add("base_type "+baseName+" field "+fieldName+" not defined in "+getName());
+				}
+            }
+        }
+
         for (FieldDescriptor field : fields.values()) {
             field.validate(errors);
         }
@@ -50,6 +78,14 @@
             o.validate(errors);
         }
     }
+    
+    public String getOption(Map<String, OptionDescriptor> options, String optionName, String defaultValue) {
+        OptionDescriptor optionDescriptor = options.get(optionName);
+        if (optionDescriptor == null) {
+            return defaultValue;
+        }
+        return optionDescriptor.getValue();
+    }
 
     public void setName(String name) {
         this.name = name;
@@ -143,4 +179,8 @@
         return false;
     }
 
+	public MessageDescriptor getBaseType() {
+		return baseType;
+	}
+
 }