Fixed a leak in a Netty ReferenceCounted resource in Gremlin Server.

If a Frame contained a Netty Bytebuf which is ReferenceCounted and that Frame did not write downstream because of an exception (perhaps during a transactional commit seemed like the likely place that would happen) then the Bytebuf would not get cleaned up properly and Netty would issue warnings. TINKERPOP-1375 CTR
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index a5310ca..eb25c73 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@
 TinkerPop 3.2.2 (NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed a potential leak of a `ReferenceCounted` resource in Gremlin Server.
 * Added class registrations for `Map.Entry` implementations to `GryoMapper`.
 
 [[release-3-2-1]]
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Frame.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Frame.java
index e6a616f..76d772e 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Frame.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/Frame.java
@@ -18,6 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.server.handler;
 
+import io.netty.util.ReferenceCounted;
+
 /**
  * A holder for a {@code String} or {@code ByteBuf} that represents a message to be written back to the requesting
  * client.
@@ -34,4 +36,13 @@
     public Object getMsg() {
         return msg;
     }
+
+    /**
+     * If the object contained in the frame is {@code ReferenceCounted} then it may need to be released or else
+     * Netty will generate warnings that counted resources are leaking.
+     */
+    public void tryRelease() {
+        if (msg instanceof ReferenceCounted)
+            ((ReferenceCounted) msg).release();
+    }
 }
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
index 29861f2..4b10bbe 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/AbstractOpProcessor.java
@@ -128,31 +128,43 @@
                     // serialize here because in sessionless requests the serialization must occur in the same
                     // thread as the eval.  as eval occurs in the GremlinExecutor there's no way to get back to the
                     // thread that processed the eval of the script so, we have to push serialization down into that
-                    Frame frame;
+                    Frame frame = null;
                     try {
                         frame = makeFrame(ctx, msg, serializer, useBinary, aggregate, code);
                     } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+
                         // exception is handled in makeFrame() - serialization error gets written back to driver
                         // at that point
                         if (manageTransactions) attemptRollback(msg, context.getGraphManager(), settings.strictTransactionManagement);
                         break;
                     }
 
-                    // only need to reset the aggregation list if there's more stuff to write
-                    if (itty.hasNext())
-                        aggregate = new ArrayList<>(resultIterationBatchSize);
-                    else {
-                        // iteration and serialization are both complete which means this finished successfully. note that
-                        // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
-                        // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
-                        // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
-                        // there are no more items to iterate and serialization is complete
-                        if (managedTransactionsForRequest) attemptCommit(msg, context.getGraphManager(), settings.strictTransactionManagement);
+                    try {
+                        // only need to reset the aggregation list if there's more stuff to write
+                        if (itty.hasNext())
+                            aggregate = new ArrayList<>(resultIterationBatchSize);
+                        else {
+                            // iteration and serialization are both complete which means this finished successfully. note that
+                            // errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
+                            // local errors will get rolledback below because the exceptions aren't thrown in those cases to be
+                            // caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
+                            // there are no more items to iterate and serialization is complete
+                            if (managedTransactionsForRequest)
+                                attemptCommit(msg, context.getGraphManager(), settings.strictTransactionManagement);
 
-                        // exit the result iteration loop as there are no more results left.  using this external control
-                        // because of the above commit.  some graphs may open a new transaction on the call to
-                        // hasNext()
-                        hasMore = false;
+                            // exit the result iteration loop as there are no more results left.  using this external control
+                            // because of the above commit.  some graphs may open a new transaction on the call to
+                            // hasNext()
+                            hasMore = false;
+                        }
+                    } catch (Exception ex) {
+                        // a frame may use a Bytebuf which is a countable release - if it does not get written
+                        // downstream it needs to be released here
+                        if (frame != null) frame.tryRelease();
+                        throw ex;
                     }
 
                     // the flush is called after the commit has potentially occurred.  in this way, if a commit was