KNOX-3035 - Limit response group header contents by size. (#1101)

Size-based group header partitioning takes precedence over length-based partitioning, iff, size-based partitioning is configured to a non-negative number.
diff --git a/gateway-service-auth/src/main/java/org/apache/knox/gateway/service/auth/AbstractAuthResource.java b/gateway-service-auth/src/main/java/org/apache/knox/gateway/service/auth/AbstractAuthResource.java
index 61c9aed..f599195 100644
--- a/gateway-service-auth/src/main/java/org/apache/knox/gateway/service/auth/AbstractAuthResource.java
+++ b/gateway-service-auth/src/main/java/org/apache/knox/gateway/service/auth/AbstractAuthResource.java
@@ -19,13 +19,12 @@
 
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
 import org.apache.knox.gateway.security.SubjectUtils;
+import org.apache.knox.gateway.util.GroupUtils;
 
 import javax.security.auth.Subject;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -39,7 +38,9 @@
 public abstract class AbstractAuthResource {
   public static final String AUTH_ACTOR_ID_HEADER_NAME = "preauth.auth.header.actor.id.name";
   public static final String AUTH_ACTOR_GROUPS_HEADER_PREFIX = "preauth.auth.header.actor.groups.prefix";
-  public static final String GROUP_FILTER_PATTERN = "preauth.group.filter.pattern";
+  public static final String GROUP_HEADER_LENGTH_LIMIT = "preauth.auth.header.groups.length.limit";
+  public static final String GROUP_HEADER_SIZE_LIMIT = "preauth.auth.header.groups.size.limit";
+  private static final String GROUP_FILTER_PATTERN = "preauth.group.filter.pattern";
 
   static final AuthMessages LOG = MessagesFactory.get(AuthMessages.class);
 
@@ -47,16 +48,21 @@
   static final String DEFAULT_AUTH_ACTOR_GROUPS_HEADER_PREFIX = "X-Knox-Actor-Groups";
   static final Pattern DEFAULT_GROUP_FILTER_PATTERN = Pattern.compile(".*");
 
-  protected static final int MAX_HEADER_LENGTH = 1000;
-  protected static final String ACTOR_GROUPS_HEADER_FORMAT = "%s-%d";
+  private static final String DEFAULT_GROUP_HEADER_LENGTH_LIMIT = "1000";
+  private static final String DEFAULT_GROUP_HEADER_SIZE_LIMIT = "-1"; // turned off by default, to be backward compatible
+  private static final String ACTOR_GROUPS_HEADER_FORMAT = "%s-%d";
 
   protected String authHeaderActorIDName;
   protected String authHeaderActorGroupsPrefix;
+  private int groupHeaderLengthLimit;
+  private int groupHeaderSizeLimit;
   protected Pattern groupFilterPattern;
 
   protected void initialize() {
     authHeaderActorIDName = getInitParameter(AUTH_ACTOR_ID_HEADER_NAME, DEFAULT_AUTH_ACTOR_ID_HEADER_NAME);
     authHeaderActorGroupsPrefix = getInitParameter(AUTH_ACTOR_GROUPS_HEADER_PREFIX, DEFAULT_AUTH_ACTOR_GROUPS_HEADER_PREFIX);
+    groupHeaderLengthLimit = Integer.parseInt(getInitParameter(GROUP_HEADER_LENGTH_LIMIT, DEFAULT_GROUP_HEADER_LENGTH_LIMIT));
+    groupHeaderSizeLimit = Integer.parseInt(getInitParameter(GROUP_HEADER_SIZE_LIMIT, DEFAULT_GROUP_HEADER_SIZE_LIMIT));
     final String groupFilterPatternString = getInitParameter(GROUP_FILTER_PATTERN, null);
     groupFilterPattern = groupFilterPatternString == null ? DEFAULT_GROUP_FILTER_PATTERN : Pattern.compile(groupFilterPatternString);
   }
@@ -84,10 +90,10 @@
 
     // Populate actor groups headers
     final Set<String> matchingGroupNames = subject == null ? Collections.emptySet()
-        : SubjectUtils.getGroupPrincipals(subject).stream().filter(group -> groupFilterPattern.matcher(group.getName()).matches()).map(group -> group.getName())
-        .collect(Collectors.toSet());
+            : SubjectUtils.getGroupPrincipals(subject).stream().filter(group -> groupFilterPattern.matcher(group.getName()).matches()).map(group -> group.getName())
+            .collect(Collectors.toSet());
     if (!matchingGroupNames.isEmpty()) {
-      final List<String> groupStrings = getGroupStrings(matchingGroupNames);
+      final List<String> groupStrings = GroupUtils.getGroupStrings(matchingGroupNames, groupHeaderLengthLimit, groupHeaderSizeLimit);
       for (int i = 0; i < groupStrings.size(); i++) {
         getResponse().addHeader(String.format(Locale.ROOT, ACTOR_GROUPS_HEADER_FORMAT, authHeaderActorGroupsPrefix, i + 1), groupStrings.get(i));
       }
@@ -95,26 +101,4 @@
     return ok().build();
   }
 
-  private List<String> getGroupStrings(Collection<String> groupNames) {
-    if (groupNames.isEmpty()) {
-      return Collections.emptyList();
-    }
-    List<String> groupStrings = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (String groupName : groupNames) {
-      if (sb.length() + groupName.length() > MAX_HEADER_LENGTH) {
-        groupStrings.add(sb.toString());
-        sb = new StringBuilder();
-      }
-      if (sb.length() > 0) {
-        sb.append(',');
-      }
-      sb.append(groupName);
-    }
-    if (sb.length() > 0) {
-      groupStrings.add(sb.toString());
-    }
-    return groupStrings;
-  }
-
 }
diff --git a/gateway-service-auth/src/test/java/org/apache/knox/gateway/service/auth/PreAuthResourceTest.java b/gateway-service-auth/src/test/java/org/apache/knox/gateway/service/auth/PreAuthResourceTest.java
index 4cb3b12..01ca3c5 100644
--- a/gateway-service-auth/src/test/java/org/apache/knox/gateway/service/auth/PreAuthResourceTest.java
+++ b/gateway-service-auth/src/test/java/org/apache/knox/gateway/service/auth/PreAuthResourceTest.java
@@ -44,7 +44,6 @@
 
   private static final String USER_NAME = "test-username";
   private ServletContext context;
-  private HttpServletRequest request;
   private HttpServletResponse response;
   private final Subject subject = new Subject();
 
@@ -53,15 +52,15 @@
     subject.getPrincipals().add(new PrimaryPrincipal(USER_NAME));
   }
 
-  private void configureCommonExpectations(String actorIdHeaderName, String groupsHeaderPrefix) {
-    configureCommonExpectations(actorIdHeaderName, groupsHeaderPrefix, Collections.emptySet());
+  private void configureCommonExpectations(String actorIdHeaderName) {
+    configureCommonExpectations(actorIdHeaderName, null, Collections.emptySet());
   }
 
   private void configureCommonExpectations(String actorIdHeaderName, String groupsHeaderPrefix, Collection<String> groups) {
     context = EasyMock.createNiceMock(ServletContext.class);
     EasyMock.expect(context.getInitParameter(PreAuthResource.AUTH_ACTOR_ID_HEADER_NAME)).andReturn(actorIdHeaderName).anyTimes();
     EasyMock.expect(context.getInitParameter(PreAuthResource.AUTH_ACTOR_GROUPS_HEADER_PREFIX)).andReturn(groupsHeaderPrefix).anyTimes();
-    request = EasyMock.createNiceMock(HttpServletRequest.class);
+    final HttpServletRequest request = EasyMock.createNiceMock(HttpServletRequest.class);
     response = EasyMock.createNiceMock(HttpServletResponse.class);
 
     if (SubjectUtils.getPrimaryPrincipalName(subject) != null) {
@@ -75,12 +74,13 @@
       final int groupStringSize = calculateGroupStringSize(groups);
       final int expectedGroupHeaderCount = groupStringSize / 1000 + 1;
       final String expectedGroupsHeaderPrefix = (groupsHeaderPrefix == null ? PreAuthResource.DEFAULT_AUTH_ACTOR_GROUPS_HEADER_PREFIX : groupsHeaderPrefix)
-          + "-";
+              + "-";
       for (int i = 1; i <= expectedGroupHeaderCount; i++) {
         response.addHeader(EasyMock.eq(expectedGroupsHeaderPrefix + i), EasyMock.anyString());
         EasyMock.expectLastCall();
       }
     }
+
     EasyMock.replay(context, request, response);
   }
 
@@ -94,17 +94,17 @@
   @Test
   public void testSubjectWithoutPrimaryPrincipalReturnsUnauthorized() throws Exception {
     subject.getPrincipals().clear();
-    configureCommonExpectations(null, null);
+    configureCommonExpectations(null);
     final PreAuthResource preAuthResource = new PreAuthResource();
     preAuthResource.context = context;
     preAuthResource.response = response;
     final Response response = executeResourceWithSubject(preAuthResource);
-    assertEquals(response.getStatus(), HttpServletResponse.SC_UNAUTHORIZED);
+    assertEquals(HttpServletResponse.SC_UNAUTHORIZED, response.getStatus());
   }
 
   @Test
   public void testPopulatingDefaultActorIdHeaderNoGroups() throws Exception {
-    configureCommonExpectations(null, null);
+    configureCommonExpectations(null);
     final PreAuthResource preAuthResource = new PreAuthResource();
     preAuthResource.context = context;
     preAuthResource.response = response;
@@ -114,7 +114,7 @@
 
   @Test
   public void testPopulatingCustomActorIdHeaderNoGroups() throws Exception {
-    configureCommonExpectations("customActorId", null);
+    configureCommonExpectations("customActorId");
     final PreAuthResource preAuthResource = new PreAuthResource();
     preAuthResource.context = context;
     preAuthResource.response = response;
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
index d9a4697..08458e3 100644
--- a/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/ConfigurableDispatch.java
@@ -22,7 +22,9 @@
 import org.apache.knox.gateway.audit.api.ActionOutcome;
 import org.apache.knox.gateway.config.Configure;
 import org.apache.knox.gateway.config.Default;
+import org.apache.knox.gateway.security.GroupPrincipal;
 import org.apache.knox.gateway.security.SubjectUtils;
+import org.apache.knox.gateway.util.GroupUtils;
 import org.apache.knox.gateway.util.StringUtils;
 
 import javax.security.auth.Subject;
@@ -40,9 +42,7 @@
 import java.util.HashMap;
 import java.util.Optional;
 import java.util.List;
-import java.util.Collection;
 import java.util.Locale;
-import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -62,18 +62,20 @@
   private boolean shouldIncludePrincipalAndGroups;
   private String actorIdHeaderName = DEFAULT_AUTH_ACTOR_ID_HEADER_NAME;
   private String actorGroupsHeaderPrefix = DEFAULT_AUTH_ACTOR_GROUPS_HEADER_PREFIX;
+  private int groupHeaderLengthLimit = Integer.parseInt(DEFAULT_GROUP_HEADER_LENGTH_LIMIT);
+  private int groupHeaderSizeLimit = Integer.parseInt(DEFAULT_GROUP_HEADER_SIZE_LIMIT);
   private String groupFilterPattern = DEFAULT_GROUP_FILTER_PATTERN;
 
   static final String DEFAULT_AUTH_ACTOR_ID_HEADER_NAME = "X-Knox-Actor-ID";
   static final String DEFAULT_AUTH_ACTOR_GROUPS_HEADER_PREFIX = "X-Knox-Actor-Groups";
+  static final String DEFAULT_GROUP_HEADER_LENGTH_LIMIT = "1000";
+  static final String DEFAULT_GROUP_HEADER_SIZE_LIMIT = "-1"; // turned off by default, to be backward compatible
   static final String DEFAULT_GROUP_FILTER_PATTERN = ".*";
   static final String DEFAULT_ARE_USERS_GROUPS_HEADER_INCLUDED = "false";
 
-  protected static final int MAX_HEADER_LENGTH = 1000;
   protected static final String ACTOR_GROUPS_HEADER_FORMAT = "%s-%d";
   protected Pattern groupPattern = Pattern.compile(DEFAULT_GROUP_FILTER_PATTERN);
 
-
   private Set<String> convertCommaDelimitedHeadersToSet(String headers) {
     return headers == null ?  Collections.emptySet(): new HashSet<>(Arrays.asList(headers.split("\\s*,\\s*")));
   }
@@ -126,8 +128,8 @@
     if (setCookieHeader.isPresent()) {
       final String[] setCookieHeaderParts = setCookieHeader.get().split(":");
       responseExcludeSetCookieHeaderDirectives = setCookieHeaderParts.length > 1
-          ? new HashSet<>(Arrays.asList(setCookieHeaderParts[1].split(";"))).stream().map(e -> e.trim()).collect(Collectors.toSet())
-          : EXCLUDE_SET_COOKIES_DEFAULT;
+              ? new HashSet<>(Arrays.asList(setCookieHeaderParts[1].split(";"))).stream().map(e -> e.trim()).collect(Collectors.toSet())
+              : EXCLUDE_SET_COOKIES_DEFAULT;
     } else {
       /* Exclude headers list is defined but we don't have set-cookie in the list,
       by default prevent these cookies from leaking */
@@ -163,6 +165,16 @@
   }
 
   @Configure
+  public void setGroupHeaderLengthLimit(@Default(DEFAULT_GROUP_HEADER_LENGTH_LIMIT) int groupHeaderLengthLimit) {
+    this.groupHeaderLengthLimit = groupHeaderLengthLimit;
+  }
+
+  @Configure
+  public void setGroupHeaderSizeLimit(@Default(DEFAULT_GROUP_HEADER_SIZE_LIMIT) int groupHeaderSizeLimit) {
+    this.groupHeaderSizeLimit = groupHeaderSizeLimit;
+  }
+
+  @Configure
   public void setGroupFilterPattern(@Default(DEFAULT_GROUP_FILTER_PATTERN) String groupFilterPattern) {
     this.groupFilterPattern = groupFilterPattern;
     groupPattern = Pattern.compile(this.groupFilterPattern);
@@ -189,7 +201,7 @@
   }
 
   private Map<String, String> addPrincipalAndGroups() {
-    final Map<String, String> headers = new ConcurrentHashMap();
+    final Map<String, String> headers = new ConcurrentHashMap<>();
     final Subject subject = SubjectUtils.getCurrentSubject();
 
     final String primaryPrincipalName = subject == null ? null : SubjectUtils.getPrimaryPrincipalName(subject);
@@ -202,10 +214,12 @@
 
     // Populate actor groups headers
     final Set<String> matchingGroupNames = subject == null ? Collections.emptySet()
-            : SubjectUtils.getGroupPrincipals(subject).stream().filter(group -> groupPattern.matcher(group.getName()).matches()).map(group -> group.getName())
+            : SubjectUtils.getGroupPrincipals(subject).stream()
+            .filter(group -> groupPattern.matcher(group.getName()).matches())
+            .map(GroupPrincipal::getName)
             .collect(Collectors.toSet());
     if (!matchingGroupNames.isEmpty()) {
-      final List<String> groupStrings = getGroupStrings(matchingGroupNames);
+      final List<String> groupStrings = GroupUtils.getGroupStrings(matchingGroupNames, groupHeaderLengthLimit, groupHeaderSizeLimit);
       for (int i = 0; i < groupStrings.size(); i++) {
         headers.put(String.format(Locale.ROOT, ACTOR_GROUPS_HEADER_FORMAT, actorGroupsHeaderPrefix, i + 1), groupStrings.get(i));
       }
@@ -213,28 +227,6 @@
     return headers;
   }
 
-  private List<String> getGroupStrings(final Collection<String> groupNames) {
-    if (groupNames.isEmpty()) {
-      return Collections.emptyList();
-    }
-    List<String> groupStrings = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (String groupName : groupNames) {
-      if (sb.length() + groupName.length() > MAX_HEADER_LENGTH) {
-        groupStrings.add(sb.toString());
-        sb = new StringBuilder();
-      }
-      if (sb.length() > 0) {
-        sb.append(',');
-      }
-      sb.append(groupName);
-    }
-    if (sb.length() > 0) {
-      groupStrings.add(sb.toString());
-    }
-    return groupStrings;
-  }
-
   @Override
   public Set<String> getOutboundResponseExcludeHeaders() {
     return responseExcludeHeaders == null ? Collections.emptySet() : responseExcludeHeaders;
diff --git a/gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupUtils.java b/gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupUtils.java
new file mode 100644
index 0000000..db2d282
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/knox/gateway/util/GroupUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class GroupUtils {
+
+    public static List<String> getGroupStrings(Collection<String> groupNames, final int lengthLimit, final int sizeLimitBytes) {
+        if (groupNames.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        // Defensive copy to isolate from concurrent modifications
+        final List<String> safeGroupNames = new ArrayList<>(groupNames);
+
+        if (sizeLimitBytes > 0) {
+            return getGroupStringsBySize(safeGroupNames, sizeLimitBytes);
+        } else {
+            return getGroupStringsByLength(safeGroupNames, lengthLimit);
+        }
+    }
+
+    private static List<String> getGroupStringsBySize(Collection<String> groupNames, int sizeLimitBytes) {
+        final List<String> groupStrings = new ArrayList<>();
+        StringBuilder sb = new StringBuilder();
+        int currentSize = 0; // UTF-8 byte count
+
+        for (String groupName : groupNames) {
+            int commaBytes = sb.length() > 0 ? 1 : 0; // comma between groups
+            int groupBytes = groupName.getBytes(StandardCharsets.UTF_8).length;
+            int projectedSize = currentSize + commaBytes + groupBytes;
+
+            if (projectedSize > sizeLimitBytes) {
+                saveGroups(groupStrings, sb);
+                sb = new StringBuilder();
+                currentSize = 0;
+            }
+
+            currentSize = addCommaIfNeeded(sb, currentSize);
+            sb.append(groupName);
+            currentSize += groupBytes;
+        }
+
+        if (sb.length() > 0) {
+            saveGroups(groupStrings, sb);
+        }
+
+        return groupStrings;
+    }
+
+    private static void saveGroups(final List<String> groupStrings, final StringBuilder sb) {
+        final String groups = sb.toString();
+        if (StringUtils.isNotBlank(groups)) {
+            groupStrings.add(groups);
+        }
+    }
+
+    private static int addCommaIfNeeded(final StringBuilder sb, int currentSize) {
+        if (sb.length() > 0) {
+            sb.append(',');
+            return currentSize + 1; // comma byte
+        } else {
+            return currentSize;
+        }
+    }
+
+    private static List<String> getGroupStringsByLength(Collection<String> groupNames, int lengthLimit) {
+        final List<String> groupStrings = new ArrayList<>();
+        StringBuilder sb = new StringBuilder();
+
+        for (String groupName : groupNames) {
+            int commaChars = sb.length() > 0 ? 1 : 0;
+            if (sb.length() + groupName.length() + commaChars > lengthLimit) {
+                saveGroups(groupStrings, sb);
+                sb = new StringBuilder();
+            }
+            addCommaIfNeeded(sb, 0);
+            sb.append(groupName);
+        }
+
+        if (sb.length() > 0) {
+            saveGroups(groupStrings, sb);
+        }
+
+        return groupStrings;
+    }
+}
diff --git a/gateway-spi/src/test/java/org/apache/knox/gateway/util/GroupUtilsTest.java b/gateway-spi/src/test/java/org/apache/knox/gateway/util/GroupUtilsTest.java
new file mode 100644
index 0000000..6314884
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/knox/gateway/util/GroupUtilsTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.util;
+
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class GroupUtilsTest {
+
+    @Test
+    public void testSplitByLengthLimitOnly() {
+        final List<String> input = Arrays.asList("alpha", "beta", "gamma", "delta", "epsilon");
+        final List<String> result = GroupUtils.getGroupStrings(input, 10, -1); // size limit disabled
+
+        // Each chunk should respect the length limit (≤ 10 chars, including commas)
+        assertTrue(result.stream().allMatch(s -> s.length() <= 10));
+        assertEquals(Arrays.asList("alpha,beta", "gamma", "delta", "epsilon"), result);
+    }
+
+    @Test
+    public void testSplitBySizeLimitOnly() {
+        // We'll create UTF-8 multibyte strings to show that length != size
+        final String wideChar = "á"; // 1 char = 2 bytes in UTF-8
+        String group = String.join("", Collections.nCopies(2000, wideChar));
+        final List<String> input = Arrays.asList(group, group, group);
+
+        // 4KB = 4096 bytes; since each string 4000 bytes, only one fits per chunk
+        final List<String> result = GroupUtils.getGroupStrings(input, 10, 4096);
+
+        // Should have 3 chunks because size limit applies, not length
+        assertEquals(3, result.size());
+
+        // Each chunk can exceed the length limit (since length = 2000 > 10)
+        assertTrue(result.stream().allMatch(s -> s.length() > 10));
+
+        // Each chunk must be ≤ 4KB in bytes
+        for (String s : result) {
+            int bytes = s.getBytes(StandardCharsets.UTF_8).length;
+            assertTrue("Chunk exceeds 4KB: " + bytes, bytes <= 4096);
+        }
+    }
+
+    @Test
+    public void testSplitBySizeLimitWithManySmallElements() throws Exception {
+        // 1. Create many small ASCII elements (5 bytes each + commas)
+        final List<String> input = new ArrayList<>();
+        for (int i = 1; i <= 100; i++) {
+            input.add("item" + i); // e.g., "item1", "item2", ...
+        }
+
+        final int sizeLimitBytes = 512;
+        final List<String> result = GroupUtils.getGroupStrings(input, 1000, sizeLimitBytes);
+
+        assertEquals("Should be chunked into multiple groups", 2, result.size());
+        assertEquals(508, result.get(0).getBytes(StandardCharsets.UTF_8).length);
+        assertEquals(182, result.get(1).getBytes(StandardCharsets.UTF_8).length);
+
+        // Verify all data preserved and ordered
+        final String joined = String.join(",", result);
+        final List<String> reconstructed = Arrays.asList(joined.split(","));
+        assertEquals("Item count should remain the same", input.size(), reconstructed.size());
+        assertEquals("All items should appear in the same order", input, reconstructed);
+    }
+
+    @Test
+    public void testSizeLimitTakesPrecedenceOverLengthLimit() {
+        // Use strings that easily exceed 10 chars but stay within 4KB
+        final List<String> input = Arrays.asList("abcdefghijklmno", "pqrstuvwxyz", "1234567890");
+
+        // lengthLimit=10 would normally split them all apart, but sizeLimit=4096 means we should keep them together
+        final List<String> result = GroupUtils.getGroupStrings(input, 10, 4096);
+
+        // Expect a single chunk (because size limit is large enough)
+        assertEquals("Should not split by length when sizeLimit > 0", 1, result.size());
+    }
+
+    @Test
+    public void testEmptyInputReturnsEmptyList() {
+        final List<String> result = GroupUtils.getGroupStrings(Collections.emptyList(), 1000, 4096);
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testShouldNotReturnEmptyElements() {
+        final List<String> input = Arrays.asList("longGroupName1", "longGroupName2", "longGroupName3");
+        final List<String> result = GroupUtils.getGroupStrings(input, 10, 1); // note the size limit is set to 1
+        assertEquals("Should not return a list with empty elements", 3, result.size());
+    }
+
+    @Test
+    public void testConcurrentGroupStringCalls() throws Exception {
+        final int THREAD_COUNT = 20;
+        final int ITERATIONS = 1000;
+        final ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
+        final List<Future<List<String>>> futures = new ArrayList<>();
+
+        final List<String> input = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            input.add("group_" + i);
+        }
+
+        for (int t = 0; t < THREAD_COUNT; t++) {
+            futures.add(executor.submit(() -> {
+                for (int i = 0; i < ITERATIONS; i++) {
+                    List<String> result = GroupUtils.getGroupStrings(input, 50, 512);
+                    assertNotNull(result);
+                    assertFalse(result.isEmpty());
+
+                    Set<String> allGroups = new HashSet<>();
+                    for (String chunk : result) {
+                        int bytes = chunk.getBytes(StandardCharsets.UTF_8).length;
+                        assertTrue("Chunk exceeds 512 bytes: " + bytes, bytes <= 512);
+
+                        for (String g : chunk.split(",")) {
+                            assertTrue("Duplicate group found: " + g, allGroups.add(g));
+                        }
+                    }
+
+                    assertEquals("Missing or extra groups in result", input.size(), allGroups.size());
+                    assertTrue("Not all expected groups are present", allGroups.containsAll(input));
+                }
+                return null;
+            }));
+        }
+
+        for (Future<List<String>> f : futures) {
+            f.get(10, TimeUnit.SECONDS);
+        }
+
+        executor.shutdown();
+        assertTrue(executor.awaitTermination(3, TimeUnit.SECONDS));
+    }
+
+}