[FLINK-39689] Make template placeholders consistent
diff --git a/docs/content.zh/docs/connectors/table/http.md b/docs/content.zh/docs/connectors/table/http.md
index 2713343..13d7a54 100644
--- a/docs/content.zh/docs/connectors/table/http.md
+++ b/docs/content.zh/docs/connectors/table/http.md
@@ -54,7 +54,7 @@
     * [Timeouts](#timeouts)
     * [Source table HTTP status code](#source-table-http-status-code)
     * [Retries and handling errors (Lookup source)](#retries-and-handling-errors-lookup-source)
-      * [Retry strategy](#retry-strategy)
+        * [Retry strategy](#retry-strategy)
       * [Lookup multiple results](#lookup-multiple-results)
   * [Working with HTTP sink tables](#working-with-http-sink-tables)
     * [HTTP Sink](#http-sink)
@@ -71,7 +71,7 @@
     * [Basic Authentication](#basic-authentication)
     * [OIDC Bearer Authentication](#oidc-bearer-authentication)
   * [Logging the HTTP content](#logging-the-http-content)
-    * [Restrictions at this time](#restrictions-at-this-time)
+      * [Restrictions at this time](#restrictions-at-this-time)
 <!-- TOC -->
 ## Dependencies
 
@@ -87,9 +87,9 @@
 
 * Existing java applications will need to be recompiled to pick up the new flink package names.
 * Existing application and SQL need to be amended to use the new connector option names. The new option names do not have
-  the _com.getindata.http_ prefix, the prefix is now _http_.
+the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are discovered have been changed, so that the GetInData jar file can co-exist
-  with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector.
+with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector.
 * Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`.
 * Note that if you were incorrectly using `gid.connector.http.request.query-param-fields` with POST or PUT did not give an error. This connector corrects the behaviour so specifying `http.request.query-param-fields` with POST or PUT does give an error.
 * The GetInData HTTP connector was built against Flink version 1, so works with that level of Flink and also Flink version 2. This connector is built against and supports Flink 2.2.
@@ -142,7 +142,7 @@
 Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the following SQL:
 
 ```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, c.details.nestedDetails.balance FROM Orders AS o 
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, c.details.nestedDetails.balance FROM Orders AS o
 JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 = c.id2
 ```
 
@@ -249,8 +249,8 @@
 #### Mapping the URL
 
 The `http.request.url-map` option provides a flexible way to map table columns to parts of the URL, either URL segments or HTTP query parameters.
-Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`,
-then specifying value `customerId:cid,orderID:oid` and a url of https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for the
+ Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used for the
 lookup query will dynamically pickup the values for `customerId`, `orderId` and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
 The expected format of the map is: `key1:value1,key2:value2`.
 
@@ -281,7 +281,7 @@
 ) WITH (
     'connector' = 'http',
     'format' = 'json',
-    'url' = 'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+    'url' = 'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
     'lookup-method' = 'GET',
     'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
 )
diff --git a/docs/content/docs/connectors/table/http.md b/docs/content/docs/connectors/table/http.md
index 95089cb..13d7a54 100644
--- a/docs/content/docs/connectors/table/http.md
+++ b/docs/content/docs/connectors/table/http.md
@@ -90,9 +90,9 @@
 the _com.getindata.http_ prefix, the prefix is now _http_.
 * The name of the connector and the identifiers of components that are discovered have been changed, so that the GetInData jar file can co-exist
 with this connector's jar file. Be aware that if you have created custom pluggable components; you will need to recompile against this connector.
-* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`.                
-* Note that if you were incorrectly using `gid.connector.http.request.query-param-fields` with POST or PUT did not give an error. This connector corrects the behaviour so specifying `http.request.query-param-fields` with POST or PUT does give an error. 
-* The GetInData HTTP connector was built against Flink version 1, so works with that level of Flink and also Flink version 2. This connector is built against and supports Flink 2.2. 
+* Note that the `http-generic-json-url` query creator now processes HTTP bodies differently using `http.request.body-template`.
+* Note that if you were incorrectly using `gid.connector.http.request.query-param-fields` with POST or PUT did not give an error. This connector corrects the behaviour so specifying `http.request.query-param-fields` with POST or PUT does give an error.
+* The GetInData HTTP connector was built against Flink version 1, so works with that level of Flink and also Flink version 2. This connector is built against and supports Flink 2.2.
 
 ## Working with HTTP lookup source tables
 
@@ -142,7 +142,7 @@
 Then we can enrich the _Orders_ table with the _Customers_ HTTP table with the following SQL:
 
 ```roomsql
-SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, c.details.nestedDetails.balance FROM Orders AS o 
+SELECT o.id, o.id2, c.msg, c.uuid, c.details.isActive, c.details.nestedDetails.balance FROM Orders AS o
 JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.id = c.id AND o.id2 = c.id2
 ```
 
@@ -249,14 +249,14 @@
 #### Mapping the URL
 
 The `http.request.url-map` option provides a flexible way to map table columns to parts of the URL, either URL segments or HTTP query parameters.
- Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`, 
-then specifying value `customerId:cid,orderID:oid` and a url of https://myendpoint/customers/{cid}?orders={oid} will mean that the url used for the
-lookup query will dynamically pickup the values for `customerId`, `orderId` and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1. 
-The expected format of the map is: `key1:value1,key2:value2`. 
+ Parses a string as a map of strings. For example if there are table columns called `customerId` and `orderId`,
+then specifying value `customerId:cid,orderID:oid` and a url of https://myendpoint/customers/{{cid}}?orders={{oid}} will mean that the url used for the
+lookup query will dynamically pickup the values for `customerId`, `orderId` and use them in the url e.g. https://myendpoint/customers/cid1?orders=oid1.
+The expected format of the map is: `key1:value1,key2:value2`.
 
 As these values are being supplied as URL segments or part or query parameters, the connector url encodes that content so characters like spaces
 do not appear invalidly in the URL. In the case where the complete url is the insert then url encoding is not performed; the url needs to be valid and already
-properly url encoded as appropriate.   
+properly url encoded as appropriate.
 
 **Example Scenario around clashing request and response columns:**
 
@@ -281,7 +281,7 @@
 ) WITH (
     'connector' = 'http',
     'format' = 'json',
-    'url' = 'http://api.example.com/lookup?customer={qp_customer}&order={qp_order}',
+    'url' = 'http://api.example.com/lookup?customer={{qp_customer}}&order={{qp_order}}',
     'lookup-method' = 'GET',
     'http.request.url-map' = 'qp_customer:qp_customer,qp_order:qp_order'
 )
@@ -360,10 +360,10 @@
 8) If you start from an OpenAPI specification that contains nested content required as a lookup join key, then use `http.request.body-template` to map top-level columns into that structure.
 9) Response content is mapped to matching named top-level columns in the lookup table. You should arrange your table columns so that some are request columns (all top level) and some are response columns.
 10) Use single quotes for the value of `http.request.body-template` so you do not need to escape the double quotes, and add newline characters for readability.
-11) If you want to enrich every event with the same API content, you can specify a placeholder as the complete URL the `url`, then use `http.request.url-map` to map it. In this scenario switching on caching is advised to avoid repeated identical API calls. 
+11) If you want to enrich every event with the same API content, you can specify a placeholder as the complete URL the `url`, then use `http.request.url-map` to map it. In this scenario switching on caching is advised to avoid repeated identical API calls.
 12) Note that columns in SQL tables (the DDL) do not have a natural way to distinguish between request and response fields. Where possible, use the API field name as column names in the DDL; this minimizes the number of columns you need to define.
 13) The exception to 12) is when a response API field name is the same as a request API field **and** they have incompatible types. In this case, define the request column with a different name, then use `http.request.query-param-fields-with-key`, `http.request.body-template`, and/or `http.request.url-map` to provide the mapping to the API field.
-14) Note the columns representing the response are those that should be used for enrichment. 
+14) Note the columns representing the response are those that should be used for enrichment.
 
 ### Format considerations
 
@@ -654,8 +654,8 @@
 If the `error-string` metadata column is defined on the table and the call succeeds then it will have a null value.
 When the HTTP response cannot be deserialized, then the `http-completion-state` will be `UNABLE_TO_DESERIALIZE_RESPONSE`
 and the `error-string` will be the response body.
-When the HTTP status code is in the `http.source.lookup.ignored-response-codes`, then the `http-completion-state` will 
-be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain information about the API call that 
+When the HTTP status code is in the `http.source.lookup.ignored-response-codes`, then the `http-completion-state` will
+be `IGNORE_STATUS_CODE`and no data is returned; any metadata columns contain information about the API call that
 occurred.
 
 When a HTTP lookup call fails and populates the metadata columns with the error information, the expected enrichment columns from the HTTP call
diff --git a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
index 6dfb414..c17e4a8 100644
--- a/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
+++ b/flink-connector-http/src/main/java/org/apache/flink/connector/http/config/HttpConnectorConfigConstants.java
@@ -29,6 +29,12 @@
 
     public static final String PROP_DELIM = ",";
 
+    /** Placeholder delimiter for template substitution (start). */
+    public static final String PLACEHOLDER_START = "{{";
+
+    /** Placeholder delimiter for template substitution (end). */
+    public static final String PLACEHOLDER_END = "}}";
+
     /** A property prefix for http connector. */
     public static final String FLINK_CONNECTOR_HTTP = "http.";
 
diff --git a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
index 5379736..7d115c6 100644
--- a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
+++ b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/RequestFactoryBase.java
@@ -37,6 +37,9 @@
 import java.util.Arrays;
 import java.util.Map;
 
+import static org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_END;
+import static org.apache.flink.connector.http.config.HttpConnectorConfigConstants.PLACEHOLDER_START;
+
 /** Base class for {@link HttpRequest} factories. */
 @Slf4j
 public abstract class RequestFactoryBase implements HttpRequestFactory {
@@ -130,7 +133,7 @@
         if (lookupQueryInfo.hasPathBasedUrlParameters()) {
             for (Map.Entry<String, String> entry :
                     lookupQueryInfo.getPathBasedUrlParameters().entrySet()) {
-                String pathParam = "{" + entry.getKey() + "}";
+                String pathParam = PLACEHOLDER_START + entry.getKey() + PLACEHOLDER_END;
                 int startIndex = resolvedUrl.indexOf(pathParam);
                 if (startIndex == -1) {
                     throw new FlinkRuntimeException(
diff --git a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
index b273dc5..318ddd9 100644
--- a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
+++ b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreator.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.http.LookupArg;
 import org.apache.flink.connector.http.LookupQueryCreator;
+import org.apache.flink.connector.http.config.HttpConnectorConfigConstants;
 import org.apache.flink.connector.http.table.lookup.LookupQueryInfo;
 import org.apache.flink.connector.http.table.lookup.LookupRow;
 import org.apache.flink.connector.http.utils.SerializationSchemaUtils;
@@ -51,23 +52,23 @@
  * Generic JSON and URL query creator; in addition to be able to map columns to json requests, it
  * allows url inserts to be mapped to column names using templating. <br>
  * <br>
- * For PUT and POST, parameters are mapped to the json body e.g. for the body template "id1;id2" and
- * url of http://base. At lookup time with values of id1=1 and id2=2 as call of http/base will be
- * issued with a json payload of {"id1":1,"id2":2} <br>
- * For all http methods, url segments and query parameters can be used to include lookup up values.
- * Using the map from <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of
- * the insert name and the value of the associated column. e.g. for <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
- * </code> = "key1":"col1" and url of http://base/{key1}. At lookup time with values of col1="aaaa"
- * a call of http/base/aaaa will be issued. For query parameters, the query param should be supplied
- * in the URL with a place-holder that will be resolved using <code>
- * GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code>
+ * For PUT and POST, parameters are mapped to the json body e.g. for REQUEST_PARAM_FIELDS =
+ * "id1;id2" and url of http://base. At lookup time with values of id1=1 and id2=2 as call of
+ * http/base will be issued with a json payload of {"id1":1,"id2":2} <br>
+ * For all http methods, url segments can be used to include lookup up values. Using the map from
+ * <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP</code> which has a key of the insert and the
+ * value of the associated column. e.g. for <code>GenericJsonAndUrlQueryCreator.REQUEST_URL_MAP
+ * </code> = "key1":"col1" and url of http://base/{{key1}}. At lookup time with values of
+ * col1="aaaa" a call of http/base/aaaa will be issued.
  */
 @Slf4j
 public class GenericJsonAndUrlQueryCreator implements LookupQueryCreator {
     private static final long serialVersionUID = 1L;
     private static final Pattern TEMPLATE_PLACEHOLDER_PATTERN =
-            Pattern.compile("\\{\\{([^}]+)\\}\\}");
+            Pattern.compile(
+                    Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_START)
+                            + "([^{}]+)"
+                            + Pattern.quote(HttpConnectorConfigConstants.PLACEHOLDER_END));
 
     // not final so we can mutate for unit test
     private SerializationSchema<RowData> serializationSchema;
@@ -164,8 +165,10 @@
             if (fieldValue == null) {
                 throw new IllegalArgumentException(
                         String.format(
-                                "Template placeholder {{%s}} references a field that does not exist in the lookup row",
-                                fieldName));
+                                "Template placeholder %s%s%s references a field that does not exist in the lookup row",
+                                HttpConnectorConfigConstants.PLACEHOLDER_START,
+                                fieldName,
+                                HttpConnectorConfigConstants.PLACEHOLDER_END));
             }
 
             String valueStr =
diff --git a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
index d7b08db..bf20e9c 100644
--- a/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
+++ b/flink-connector-http/src/main/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorFactory.java
@@ -66,7 +66,7 @@
                                     + "<br>"
                                     + "For example if there are table columns called customerId"
                                     + " and orderId, then specifying value customerId:cid1,orderID:oid"
-                                    + " and a url of https://myendpoint/customers/{cid}/orders/{oid}"
+                                    + " and a url of https://myendpoint/customers/{{cid}}/orders/{{oid}}"
                                     + " will mean that the url used for the lookup query will"
                                     + " dynamically pickup the values for customerId, orderId"
                                     + " and use them in the url."
diff --git a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
index 056aff5..5e0d973 100644
--- a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
+++ b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/BodyBasedRequestFactoryTest.java
@@ -125,14 +125,14 @@
                 new TestSpec(
                         null,
                         Map.of("param1", "value1"),
-                        "http://service/{param1}",
+                        "http://service/{{param1}}",
                         lookupMethod,
                         "http://service/value1"),
                 // 2 path param
                 new TestSpec(
                         null,
                         Map.of("param1", "value1", "param2", "value2"),
-                        "http://service/{param1}/param2/{param2}",
+                        "http://service/{{param1}}/param2/{{param2}}",
                         lookupMethod,
                         "http://service/value1/param2/value2"),
                 // 1 query param
@@ -160,35 +160,35 @@
                 new TestSpec(
                         Map.of("param3", "value3", "param4", "value4"),
                         Map.of("param1", "value1", "param2", "value2"),
-                        "http://service/{param1}/param2/{param2}",
+                        "http://service/{{param1}}/param2/{{param2}}",
                         lookupMethod,
                         "http://service/value1/param2/value2?param3=value3&param4=value4"),
                 // URL encoding: path param with spaces
                 new TestSpec(
                         null,
                         Map.of("param1", "hello world"),
-                        "http://service/{param1}",
+                        "http://service/{{param1}}",
                         lookupMethod,
                         "http://service/hello+world"),
                 // URL encoding: path param with special characters
                 new TestSpec(
                         null,
                         Map.of("param1", "user@example.com"),
-                        "http://service/{param1}",
+                        "http://service/{{param1}}",
                         lookupMethod,
                         "http://service/user%40example.com"),
                 // URL encoding: path param with slash
                 new TestSpec(
                         null,
                         Map.of("param1", "path/to/resource"),
-                        "http://service/{param1}",
+                        "http://service/{{param1}}",
                         lookupMethod,
                         "http://service/path%2Fto%2Fresource"),
                 // URL encoding: multiple path params with special characters
                 new TestSpec(
                         null,
                         Map.of("param1", "hello world", "param2", "user@example.com"),
-                        "http://service/{param1}/users/{param2}",
+                        "http://service/{{param1}}/users/{{param2}}",
                         lookupMethod,
                         "http://service/hello+world/users/user%40example.com"),
                 // URL encoding: query param with special characters (?, &, ;, space)
@@ -209,7 +209,7 @@
                 new TestSpec(
                         Map.of("query1", "value?test"),
                         Map.of("path1", "user@domain"),
-                        "http://service/{path1}",
+                        "http://service/{{path1}}",
                         lookupMethod,
                         "http://service/user%40domain?query1=value%3Ftest"),
                 // Complete URL replacement with URL-encoded parts
@@ -218,7 +218,7 @@
                         Map.of(
                                 "url",
                                 "https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc"),
-                        "{url}",
+                        "{{url}}",
                         lookupMethod,
                         "https://api.example.com/search?q=hello%20world&filter=type%3Dbook&sort=date%3Adesc"));
     }
diff --git a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
index 769303d..5497f1d 100644
--- a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
+++ b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/HttpLookupTableSourceITCaseTest.java
@@ -969,7 +969,7 @@
                         + "  `email` STRING\n"
                         + ") WITH ("
                         + "'connector' = 'http',"
-                        + "'url' = '{url}',"
+                        + "'url' = '{{url}}',"
                         + "'http.request.url-map' = 'url:url',"
                         + "'format' = 'json',"
                         + "'asyncPolling' = 'false',"
@@ -1235,7 +1235,7 @@
                         + "'lookup-method' = 'GET',"
                         + "'url' = 'http://localhost:"
                         + serverPort2
-                        + "/client?customer={customer}&id2={id2}',"
+                        + "/client?customer={{customer}}&id2={{id2}}',"
                         + "'http.source.lookup.header.Content-Type' = 'application/json',"
                         + "'asyncPolling' = 'true',"
                         + "'http.source.lookup.query-creator' = 'http-generic-json-url',"
diff --git a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
index c627746..0a7521f 100644
--- a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
+++ b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/LookupQueryInfoTest.java
@@ -68,7 +68,7 @@
         Map<String, String> pathBasedUrlPathParameters = Map.of("key1", "value1");
 
         lookupQueryInfo =
-                new LookupQueryInfo("http://service/{key1}", null, pathBasedUrlPathParameters);
+                new LookupQueryInfo("http://service/{{key1}}", null, pathBasedUrlPathParameters);
 
         assertThat(lookupQueryInfo.hasLookupQuery()).isTrue();
         assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
diff --git a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
index defb4ed..d54efd1 100644
--- a/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
+++ b/flink-connector-http/src/test/java/org/apache/flink/connector/http/table/lookup/querycreators/GenericJsonAndUrlQueryCreatorTest.java
@@ -364,6 +364,31 @@
     }
 
     @Test
+    public void testBodyTemplateWithInvalidNestedPlaceholders() {
+        // GIVEN - Body template with invalid nested placeholders like {{aaa}}/{{bbb}}
+        Configuration config = new Configuration();
+        config.set(LOOKUP_METHOD, "POST");
+        config.set(REQUEST_BODY_TEMPLATE, "{\"path\":\"{{aaa}}/{{bbb}}\"}");
+
+        LookupRow lookupRow = getLookupRow(KEY_1);
+        lookupRow.setLookupPhysicalRowDataType(DATATYPE_1);
+
+        GenericJsonAndUrlQueryCreator creator =
+                (GenericJsonAndUrlQueryCreator)
+                        new GenericJsonAndUrlQueryCreatorFactory()
+                                .createLookupQueryCreator(
+                                        config,
+                                        lookupRow,
+                                        getTableContext(config, RESOLVED_SCHEMA));
+
+        // WHEN/THEN - Should throw IllegalArgumentException because {{aaa}}/{{bbb}} is not a valid
+        // field name
+        assertThatThrownBy(() -> creator.createLookupQuery(ROWDATA))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("does not exist");
+    }
+
+    @Test
     public void testBodyTemplateWithComplexNestedStructureAndTimestamps() throws Exception {
         // GIVEN - Complex template with primitives, arrays, nested objects, timestamps, and
         // literals