KAFKA-8722; Ensure record CRC is always verified when appending to log (#7124)

We found that when record.offset is not equal to the offset we are expecting, kafka will set the variable inPlaceAssignment to false. When inPlaceAssignment is false, data will not be verified. This patch adds the missing CRC verification.

Reviewers: Jason Gustafson <jason@confluent.io>
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 47d4bd4..3555710 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -166,6 +166,7 @@
 
     records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
       val record = logEntry.record
+      record.ensureValid()
       validateKey(record, compactedTopic)
 
       if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > Record.MAGIC_VALUE_V0) {
@@ -201,9 +202,6 @@
         shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
         messageSizeMaybeChanged = true)
     } else {
-      // ensure the inner messages are valid
-      validatedRecords.foreach(_.ensureValid)
-
       // we can update the wrapper message only and write the compressed payload as is
       val entry = records.shallowEntries.iterator.next()
       val offset = offsetCounter.addAndGet(validatedRecords.size) - 1