AVRO-3225: AVRO-3226: Fix possible StackOverflowException and OutOfMemoryException on invalid input

* AVRO-3225: Fix possible StackOverflowException on invalid input

* AVRO-3226: Fix possible OutOfMemoryException on invalid input

* AVRO-3226: Backport changes for netstandard2.0

(cherry picked from commit a1fce29d9675b4dd95dfee9db32cc505d0b2227c)
Signed-off-by: Ryan Skraba <ryan@skraba.com>
diff --git a/lang/csharp/src/apache/main/IO/BinaryDecoder.netstandard2.0.cs b/lang/csharp/src/apache/main/IO/BinaryDecoder.netstandard2.0.cs
index 91afeb5..8c6cb7e 100644
--- a/lang/csharp/src/apache/main/IO/BinaryDecoder.netstandard2.0.cs
+++ b/lang/csharp/src/apache/main/IO/BinaryDecoder.netstandard2.0.cs
@@ -16,6 +16,8 @@
  * limitations under the License.
  */
 using System;
+using System.IO;
+using System.Text;
 
 namespace Avro.IO
 {
@@ -25,6 +27,11 @@
     public partial class BinaryDecoder
     {
         /// <summary>
+        /// It is hard to find documentation about the real maximum array length in .NET Framework 4.6.1, but this seems to work :-/
+        /// </summary>
+        private const int MaxDotNetArrayLength = 0x3FFFFFFF;
+
+        /// <summary>
         /// A float is written as 4 bytes.
         /// The float is converted into a 32-bit integer using a method equivalent to
         /// Java's floatToIntBits and then encoded in little-endian format.
@@ -72,10 +79,28 @@
         public string ReadString()
         {
             int length = ReadInt();
-            byte[] buffer = new byte[length];
-            //TODO: Fix this because it's lame;
-            ReadFixed(buffer);
-            return System.Text.Encoding.UTF8.GetString(buffer);
+
+            if (length < 0)
+            {
+                throw new AvroException("Can not deserialize a string with negative length!");
+            }
+
+            if (length > MaxDotNetArrayLength)
+            {
+                throw new AvroException("String length is not supported!");
+            }
+
+            using (var binaryReader = new BinaryReader(stream, Encoding.UTF8, true))
+            {
+                var bytes = binaryReader.ReadBytes(length);
+
+                if (bytes.Length != length)
+                {
+                    throw new AvroException("Could not read as many bytes from stream as expected!");
+                }
+
+                return Encoding.UTF8.GetString(bytes);
+            }
         }
 
         private void Read(byte[] buffer, int start, int len)
diff --git a/lang/csharp/src/apache/main/IO/BinaryDecoder.notnetstandard2.0.cs b/lang/csharp/src/apache/main/IO/BinaryDecoder.notnetstandard2.0.cs
index 17bd841..a3bd217 100644
--- a/lang/csharp/src/apache/main/IO/BinaryDecoder.notnetstandard2.0.cs
+++ b/lang/csharp/src/apache/main/IO/BinaryDecoder.notnetstandard2.0.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Buffers;
 using System.Buffers.Binary;
+using System.IO;
 using System.Text;
 
 namespace Avro.IO
@@ -28,6 +29,8 @@
     public partial class BinaryDecoder
     {
         private const int StackallocThreshold = 256;
+        private const int MaxFastReadLength = 4096;
+        private const int MaxDotNetArrayLength = 0x7FFFFFC7;
 
         /// <summary>
         /// A float is written as 4 bytes.
@@ -63,23 +66,54 @@
         /// <returns>String read from the stream.</returns>
         public string ReadString()
         {
-            byte[] bufferArray = null;
-
             int length = ReadInt();
-            Span<byte> buffer = length <= StackallocThreshold ?
-                stackalloc byte[length] :
-                (bufferArray = ArrayPool<byte>.Shared.Rent(length)).AsSpan(0, length);
 
-            Read(buffer);
-
-            string result = Encoding.UTF8.GetString(buffer);
-
-            if (bufferArray != null)
+            if (length < 0)
             {
-                ArrayPool<byte>.Shared.Return(bufferArray);
+                throw new AvroException("Can not deserialize a string with negative length!");
             }
 
-            return result;
+            if (length <= MaxFastReadLength)
+            {
+                byte[] bufferArray = null;
+
+                try
+                {
+                    Span<byte> buffer = length <= StackallocThreshold ?
+                        stackalloc byte[length] :
+                        (bufferArray = ArrayPool<byte>.Shared.Rent(length)).AsSpan(0, length);
+
+                    Read(buffer);
+
+                    return Encoding.UTF8.GetString(buffer);
+                }
+                finally
+                {
+                    if (bufferArray != null)
+                    {
+                        ArrayPool<byte>.Shared.Return(bufferArray);
+                    }
+                }
+            }
+            else
+            {
+                if (length > MaxDotNetArrayLength)
+                {
+                    throw new AvroException("String length is not supported!");
+                }
+
+                using (var binaryReader = new BinaryReader(stream, Encoding.UTF8, true))
+                {
+                    var bytes = binaryReader.ReadBytes(length);
+
+                    if (bytes.Length != length)
+                    {
+                        throw new AvroException("Could not read as many bytes from stream as expected!");
+                    }
+
+                    return Encoding.UTF8.GetString(bytes);
+                }
+            }
         }
 
         private void Read(byte[] buffer, int start, int len)
diff --git a/lang/csharp/src/apache/test/IO/BinaryCodecTests.cs b/lang/csharp/src/apache/test/IO/BinaryCodecTests.cs
index a6a1731..f894d7b 100644
--- a/lang/csharp/src/apache/test/IO/BinaryCodecTests.cs
+++ b/lang/csharp/src/apache/test/IO/BinaryCodecTests.cs
@@ -20,6 +20,7 @@
 using NUnit.Framework;
 using System.IO;
 using System.Linq;
+using System.Text;
 using Avro.IO;
 
 namespace Avro.Test
@@ -214,23 +215,105 @@
             TestSkip(n, (Decoder d) => d.SkipString(), (Encoder e, string t) => e.WriteString(t), overhead + n.Length);
         }
 
-#if NETCOREAPP3_1
+#if NETCOREAPP3_1_OR_GREATER
         [Test]
-        public void TestLargeString()
+        public void TestStringReadIntoArrayPool()
         {
+            const int maxFastReadLength = 4096;
+
             // Create a 16KB buffer in the Array Pool
             var largeBufferToSeedPool = ArrayPool<byte>.Shared.Rent(2 << 14);
             ArrayPool<byte>.Shared.Return(largeBufferToSeedPool);
 
-            // Create a slightly less than 16KB buffer, which will use the 16KB buffer in the pool
-            var n = string.Concat(Enumerable.Repeat("1234567890", 1600));
-            var overhead = 3;
+            var n = string.Concat(Enumerable.Repeat("A", maxFastReadLength));
+            var overhead = 2;
 
             TestRead(n, (Decoder d) => d.ReadString(), (Encoder e, string t) => e.WriteString(t), overhead + n.Length);
-            TestSkip(n, (Decoder d) => d.SkipString(), (Encoder e, string t) => e.WriteString(t), overhead + n.Length);
+        }
+
+        [Test]
+        public void TestStringReadByBinaryReader()
+        {
+            const int overhead = 2;
+            const int maxFastReadLength = 4096;
+            const int expectedStringLength = maxFastReadLength + 1;
+            var n = string.Concat(Enumerable.Repeat("A", expectedStringLength));
+
+            TestRead(n, (Decoder d) => d.ReadString(), (Encoder e, string t) => e.WriteString(t), expectedStringLength + overhead);
         }
 #endif
 
+        [Test]
+        public void TestInvalidInputWithNegativeStringLength()
+        {
+            using (MemoryStream iostr = new MemoryStream())
+            {
+                Encoder e = new BinaryEncoder(iostr);
+
+                e.WriteLong(-1);
+
+                iostr.Flush();
+                iostr.Position = 0;
+                Decoder d = new BinaryDecoder(iostr);
+
+                var exception = Assert.Throws<AvroException>(() => d.ReadString());
+
+                Assert.NotNull(exception);
+                Assert.AreEqual("Can not deserialize a string with negative length!", exception.Message);
+                iostr.Close();
+            }
+        }
+
+        [Test]
+        public void TestInvalidInputWithMaxIntAsStringLength()
+        {
+            using (MemoryStream iostr = new MemoryStream())
+            {
+                Encoder e = new BinaryEncoder(iostr);
+
+                e.WriteLong(int.MaxValue);
+                e.WriteBytes(Encoding.UTF8.GetBytes("SomeSmallString"));
+
+                iostr.Flush();
+                iostr.Position = 0;
+                Decoder d = new BinaryDecoder(iostr);
+
+                var exception = Assert.Throws<AvroException>(() => d.ReadString());
+
+                Assert.NotNull(exception);
+                Assert.AreEqual("String length is not supported!", exception.Message);
+                iostr.Close();
+            }
+        }
+
+        [Test]
+        public void TestInvalidInputWithMaxArrayLengthAsStringLength()
+        {
+            using (MemoryStream iostr = new MemoryStream())
+            {
+                Encoder e = new BinaryEncoder(iostr);
+
+#if NETCOREAPP3_1_OR_GREATER
+                const int maximumArrayLength = 0x7FFFFFC7;
+#else
+                const int maximumArrayLength = 0x7FFFFFFF / 2;
+#endif
+
+                e.WriteLong(maximumArrayLength);
+                e.WriteBytes(Encoding.UTF8.GetBytes("SomeSmallString"));
+
+                iostr.Flush();
+                iostr.Position = 0;
+                Decoder d = new BinaryDecoder(iostr);
+
+                var exception = Assert.Throws<AvroException>(() => d.ReadString());
+
+                Assert.NotNull(exception);
+                Assert.AreEqual("Could not read as many bytes from stream as expected!", exception.Message);
+                iostr.Close();
+            }
+        }
+
         [TestCase(0, 1)]
         [TestCase(1, 1)]
         [TestCase(64, 2)]